This is an automated email from the ASF dual-hosted git repository.
casion pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 2228cb4412 [Feature][JDBC] JDBC driver support pull Multiple result
sets (#5173)
2228cb4412 is described below
commit 2228cb44120d70bb37b085ebdb0156d7ee24f96c
Author: peacewong <[email protected]>
AuthorDate: Sat Sep 14 11:36:10 2024 +0800
[Feature][JDBC] JDBC driver support pull Multiple result sets (#5173)
* Support headers params
* once client support submit multiple jobs
* Support select col get result
* JDBC Driver support pull Multiple result sets
* update version to v4
---
.github/workflows/check-license.yml | 2 +-
.../job/interactive/InteractiveJobDesc.java | 11 ++
.../application/operator/ujes/LinkisJobOper.java | 1 +
.../computation/client/LinkisJobBuilder.scala | 2 +-
.../computation/client/LinkisJobClient.scala | 30 +++-
.../client/once/simple/SimpleOnceJob.scala | 13 +-
.../client/once/simple/SimpleOnceJobBuilder.scala | 53 +++++-
.../org/apache/linkis/ujes/client/UJESClient.scala | 5 +-
.../ujes/client/request/JobSubmitAction.scala | 14 ++
.../ujes/client/request/ResultSetAction.scala | 28 ++-
.../linkis/ujes/client/utils/UJESClientUtils.scala | 11 +-
.../org/apache/linkis/ujes/jdbc/UJESSQLDriver.java | 1 +
.../linkis/ujes/jdbc/LinkisSQLConnection.scala | 2 -
.../linkis/ujes/jdbc/LinkisSQLStatement.scala | 48 ++++-
.../linkis/ujes/jdbc/UJESSQLDriverMain.scala | 4 +
.../apache/linkis/ujes/jdbc/UJESSQLResultSet.scala | 57 +++---
.../linkis/ujes/jdbc/LinkisSQLStatementTest.java | 194 ++++++++++++++++++++-
.../linkis/ujes/jdbc/UJESSQLResultSetTest.java | 108 ++++++++++++
18 files changed, 521 insertions(+), 63 deletions(-)
diff --git a/.github/workflows/check-license.yml
b/.github/workflows/check-license.yml
index 3c79607dc3..2a6cf67f23 100644
--- a/.github/workflows/check-license.yml
+++ b/.github/workflows/check-license.yml
@@ -36,7 +36,7 @@ jobs:
echo "rat_file=$rat_file"
if [[ -n "$rat_file" ]];then echo "check error!" && cat $rat_file
&& exit 123;else echo "check success!" ;fi
- name: Upload the report
- uses: actions/upload-artifact@v2
+ uses: actions/upload-artifact@v4
with:
name: license-check-report
path: "**/target/rat.txt"
diff --git
a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJobDesc.java
b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJobDesc.java
index e594d9cc23..629c466841 100644
---
a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJobDesc.java
+++
b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/job/interactive/InteractiveJobDesc.java
@@ -30,6 +30,9 @@ public class InteractiveJobDesc {
private Map<String, Object> labelMap;
private Map<String, Object> sourceMap;
+ // 需要加到header中的一些参数
+ private Map<String, String> headers;
+
public String getSubmitUser() {
return submitUser;
}
@@ -101,4 +104,12 @@ public class InteractiveJobDesc {
public void setLabelMap(Map<String, Object> labelMap) {
this.labelMap = labelMap;
}
+
+ public Map<String, String> getHeaders() {
+ return headers;
+ }
+
+ public void setHeaders(Map<String, String> headers) {
+ this.headers = headers;
+ }
}
diff --git
a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java
b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java
index bfebf62c71..1c17fcd969 100644
---
a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java
+++
b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/operator/ujes/LinkisJobOper.java
@@ -103,6 +103,7 @@ public class LinkisJobOper implements JobOper {
.setVariableMap(jobDesc.getParamVarsMap())
.setLabels(jobDesc.getLabelMap())
.setSource(jobDesc.getSourceMap())
+ .setHeaders(jobDesc.getHeaders())
.build();
logger.info("Request info to Linkis: \n{}",
CliUtils.GSON.toJson(jobSubmitAction));
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala
index 9cc2863559..eff8411603 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobBuilder.scala
@@ -174,7 +174,7 @@ object LinkisJobBuilder {
private var threadPool: ScheduledThreadPoolExecutor = Utils.defaultScheduler
private var serverUrl: String = _
- private var authTokenValue: String = CommonVars[String](
+ var authTokenValue: String = CommonVars[String](
"wds.linkis.client.test.common.tokenValue",
"LINKIS_CLI_TEST"
).getValue // This is the default authToken, we usually suggest set
different ones for users.
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobClient.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobClient.scala
index d44c479abb..80e8e7ad42 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobClient.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/LinkisJobClient.scala
@@ -17,11 +17,37 @@
package org.apache.linkis.computation.client
-import org.apache.linkis.computation.client.interactive.InteractiveJob
-import org.apache.linkis.computation.client.once.OnceJob
+import org.apache.linkis.bml.client.BmlClientFactory
+import org.apache.linkis.computation.client.interactive.{InteractiveJob,
InteractiveJobBuilder}
+import org.apache.linkis.computation.client.once.{LinkisManagerClient, OnceJob}
+import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob,
SimpleOnceJobBuilder}
+import org.apache.linkis.httpclient.dws.config.DWSClientConfig
+import org.apache.linkis.ujes.client.UJESClientImpl
import java.io.Closeable
+class LinkisJobClient(clientConfig: DWSClientConfig) extends Closeable {
+
+ private val ujseClient = new UJESClientImpl(clientConfig)
+
+ private lazy val linkisManagerCLient = LinkisManagerClient(ujseClient)
+
+ override def close(): Unit = {
+ if (null != linkisManagerCLient) {
+ linkisManagerCLient.close()
+ }
+ }
+
+ def onceJobBuilder(): SimpleOnceJobBuilder =
+ SimpleOnceJob.builder(SimpleOnceJobBuilder.getBmlClient(clientConfig),
linkisManagerCLient)
+
+ def interactiveJobBuilder(): InteractiveJobBuilder = {
+ val builder = InteractiveJob.builder()
+ builder.setUJESClient(ujseClient)
+ }
+
+}
+
/**
* This class is only used to provide a unified entry for user to build a
LinkisJob conveniently and
* simply. Please keep this class lightweight enough, do not set too many
field to confuse user.
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala
index 4992b17814..13d96c238a 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJob.scala
@@ -17,13 +17,13 @@
package org.apache.linkis.computation.client.once.simple
+import org.apache.linkis.bml.client.BmlClient
import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.computation.client.LinkisJobMetrics
import org.apache.linkis.computation.client.job.AbstractSubmittableLinkisJob
import org.apache.linkis.computation.client.once.{LinkisManagerClient,
OnceJob, SubmittableOnceJob}
import org.apache.linkis.computation.client.once.action.CreateEngineConnAction
-import org.apache.linkis.computation.client.once.result.CreateEngineConnResult
import org.apache.linkis.computation.client.operator.OnceJobOperator
import java.util.Locale
@@ -109,15 +109,13 @@ class SubmittableSimpleOnceJob(
with AbstractSubmittableLinkisJob {
private var ecmServiceInstance: ServiceInstance = _
- private var createEngineConnResult: CreateEngineConnResult = _
def getECMServiceInstance: ServiceInstance = ecmServiceInstance
- def getCreateEngineConnResult: CreateEngineConnResult =
createEngineConnResult
override protected def doSubmit(): Unit = {
logger.info(s"Ready to create a engineConn:
${createEngineConnAction.getRequestPayload}.")
- createEngineConnResult =
linkisManagerClient.createEngineConn(createEngineConnAction)
- lastNodeInfo = createEngineConnResult.getNodeInfo
+ val nodeInfo = linkisManagerClient.createEngineConn(createEngineConnAction)
+ lastNodeInfo = nodeInfo.getNodeInfo
serviceInstance = getServiceInstance(lastNodeInfo)
ticketId = getTicketId(lastNodeInfo)
ecmServiceInstance = getECMServiceInstance(lastNodeInfo)
@@ -160,6 +158,11 @@ object SimpleOnceJob {
def builder(): SimpleOnceJobBuilder = new SimpleOnceJobBuilder
+ def builder(
+ bmlClient: BmlClient,
+ linkisManagerClient: LinkisManagerClient
+ ): SimpleOnceJobBuilder = new SimpleOnceJobBuilder(bmlClient,
linkisManagerClient)
+
/**
* Build a submitted SimpleOnceJob by id and user.
* @param id
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala
index dc4451ff0f..d7c4746188 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/computation/client/once/simple/SimpleOnceJobBuilder.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.computation.client.once.simple
import org.apache.linkis.bml.client.{BmlClient, BmlClientFactory}
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.computation.client.LinkisJobBuilder
+import org.apache.linkis.computation.client.LinkisJobBuilder.clientConfig
import org.apache.linkis.computation.client.once.LinkisManagerClient
import org.apache.linkis.computation.client.once.action.CreateEngineConnAction
import org.apache.linkis.computation.client.once.simple.SimpleOnceJobBuilder._
@@ -28,6 +29,8 @@ import
org.apache.linkis.governance.common.entity.job.OnceExecutorContent
import org.apache.linkis.governance.common.utils.OnceExecutorContentUtils
import
org.apache.linkis.governance.common.utils.OnceExecutorContentUtils.BmlResource
import org.apache.linkis.httpclient.dws.DWSHttpClient
+import
org.apache.linkis.httpclient.dws.authentication.TokenAuthenticationStrategy
+import org.apache.linkis.httpclient.dws.config.{DWSClientConfig,
DWSClientConfigBuilder}
import org.apache.linkis.manager.label.constant.LabelKeyConstant
import org.apache.linkis.protocol.utils.TaskUtils
import org.apache.linkis.ujes.client.exception.UJESJobException
@@ -38,12 +41,19 @@ import java.util
import scala.collection.convert.WrapAsJava._
import scala.collection.convert.WrapAsScala._
-class SimpleOnceJobBuilder private[simple] () extends
LinkisJobBuilder[SubmittableSimpleOnceJob] {
+class SimpleOnceJobBuilder private[simple] (
+ private val bmlClient: BmlClient,
+ private val linkisManagerClient: LinkisManagerClient
+) extends LinkisJobBuilder[SubmittableSimpleOnceJob] {
private var createService: String = _
private var maxSubmitTime: Long = _
private var description: String = _
+ def this() = {
+ this(null, null)
+ }
+
def setCreateService(createService: String): this.type = {
this.createService = createService
this
@@ -69,10 +79,26 @@ class SimpleOnceJobBuilder private[simple] () extends
LinkisJobBuilder[Submittab
val contentMap = OnceExecutorContentUtils.contentToMap(onceExecutorContent)
val bytes = DWSHttpClient.jacksonJson.writeValueAsBytes(contentMap)
val response =
- getBmlClient.uploadResource(executeUser, getFilePath, new
ByteArrayInputStream(bytes))
+ getThisBMLClient.uploadResource(executeUser, getFilePath, new
ByteArrayInputStream(bytes))
OnceExecutorContentUtils.resourceToValue(BmlResource(response.resourceId,
response.version))
}
+ protected def getThisBMLClient(): BmlClient = {
+ if (null == this.bmlClient) {
+ getBmlClient(LinkisJobBuilder.getDefaultClientConfig)
+ } else {
+ this.bmlClient
+ }
+ }
+
+ protected def getThisLinkisManagerClient(): LinkisManagerClient = {
+ if (null == this.linkisManagerClient) {
+ getLinkisManagerClient
+ } else {
+ this.linkisManagerClient
+ }
+ }
+
override def build(): SubmittableSimpleOnceJob = {
ensureNotNull(labels, "labels")
ensureNotNull(jobContent, "jobContent")
@@ -99,7 +125,7 @@ class SimpleOnceJobBuilder private[simple] () extends
LinkisJobBuilder[Submittab
.setMaxSubmitTime(maxSubmitTime)
.setDescription(description)
.build()
- new SubmittableSimpleOnceJob(getLinkisManagerClient,
createEngineConnAction)
+ new SubmittableSimpleOnceJob(getThisLinkisManagerClient,
createEngineConnAction)
}
implicit def toMap(map: util.Map[String, Any]): util.Map[String, String] =
map.map {
@@ -128,10 +154,27 @@ object SimpleOnceJobBuilder {
private var bmlClient: BmlClient = _
private var linkisManagerClient: LinkisManagerClient = _
- def getBmlClient: BmlClient = {
+ def getBmlClient(clientConfig: DWSClientConfig): BmlClient = {
if (bmlClient == null) synchronized {
if (bmlClient == null) {
- bmlClient =
BmlClientFactory.createBmlClient(LinkisJobBuilder.getDefaultClientConfig)
+ val newClientConfig = DWSClientConfigBuilder
+ .newBuilder()
+ .addServerUrl(clientConfig.getServerUrl)
+ .connectionTimeout(clientConfig.getConnectTimeout)
+ .discoveryEnabled(clientConfig.isDiscoveryEnabled)
+ .loadbalancerEnabled(clientConfig.isLoadbalancerEnabled)
+ .maxConnectionSize(clientConfig.getMaxConnection)
+ .retryEnabled(clientConfig.isRetryEnabled)
+ .setRetryHandler(clientConfig.getRetryHandler)
+ .readTimeout(
+ clientConfig.getReadTimeout
+ ) // We think 90s is enough, if SocketTimeoutException is throw,
just set a new clientConfig to modify it.
+ .setAuthenticationStrategy(new TokenAuthenticationStrategy())
+ .setAuthTokenKey(TokenAuthenticationStrategy.TOKEN_KEY)
+ .setAuthTokenValue(LinkisJobBuilder.authTokenValue)
+ .setDWSVersion(clientConfig.getDWSVersion)
+ .build()
+ bmlClient = BmlClientFactory.createBmlClient(newClientConfig)
Utils.addShutdownHook(() => bmlClient.close())
}
}
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClient.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClient.scala
index 6657b7e4db..19ac7343d8 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClient.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/UJESClient.scala
@@ -69,7 +69,10 @@ abstract class UJESClient extends Closeable with Logging {
* @return
*/
def progress(jobExecuteResult: JobExecuteResult): JobProgressResult =
- Utils.tryCatch(executeJobExecIdAction(jobExecuteResult,
JobServiceType.JobProgress)) { t =>
+ Utils.tryCatch(
+ executeJobExecIdAction(jobExecuteResult, JobServiceType.JobProgress)
+ .asInstanceOf[JobProgressResult]
+ ) { t =>
logger.warn("Failed to get progress, return empty progress.", t)
val result = new JobProgressResult
result.setProgress(0)
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/JobSubmitAction.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/JobSubmitAction.scala
index f96c6227fe..aba26c619f 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/JobSubmitAction.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/JobSubmitAction.scala
@@ -25,6 +25,8 @@ import
org.apache.linkis.ujes.client.exception.UJESClientBuilderException
import java.util
+import scala.collection.JavaConverters.mapAsScalaMapConverter
+
class JobSubmitAction private () extends POSTAction with UJESJobAction {
override def suffixURLs: Array[String] = Array("entrance", "submit")
@@ -52,6 +54,8 @@ object JobSubmitAction {
private var source: util.Map[String, AnyRef] = _
+ private var headers: util.Map[String, String] = _
+
def addExecuteCode(executeCode: String): Builder = {
if (null == executionContent) executionContent = new
util.HashMap[String, AnyRef]()
executionContent.put("code", executeCode)
@@ -129,6 +133,11 @@ object JobSubmitAction {
this
}
+ def setHeaders(headers: util.Map[String, String]): Builder = {
+ this.headers = headers
+ this
+ }
+
def build(): JobSubmitAction = {
val submitAction = new JobSubmitAction
submitAction.setUser(user)
@@ -145,6 +154,11 @@ object JobSubmitAction {
if (this.labels == null) this.labels = new util.HashMap[String, AnyRef]()
submitAction.addRequestPayload(TaskConstant.LABELS, this.labels)
+
+ if (this.headers == null) this.headers = new util.HashMap[String,
String]()
+ this.headers.asScala.foreach { case (k, v) =>
+ if (k != null && v != null) submitAction.addHeader(k, v)
+ }
submitAction
}
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala
index 45f0e8a89f..708689089a 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/request/ResultSetAction.scala
@@ -38,7 +38,9 @@ object ResultSetAction {
// default value is :org.apache.linkis.storage.domain.Dolphin.LINKIS_NULL
private var nullValue: String = "LINKIS_NULL"
- private var enableLimit: Boolean = false
+ private var enableLimit: Option[Boolean] = None
+ private var columnPage: Int = _
+ private var columnPageSize: Int = _
def setUser(user: String): Builder = {
this.user = user
@@ -71,7 +73,17 @@ object ResultSetAction {
}
def setEnableLimit(enableLimit: Boolean): Builder = {
- this.enableLimit = enableLimit
+ this.enableLimit = Some(enableLimit)
+ this
+ }
+
+ def setColumnPage(columnPage: Int): Builder = {
+ this.columnPage = columnPage
+ this
+ }
+
+ def setColumnPageSize(columnPageSize: Int): Builder = {
+ this.columnPageSize = columnPageSize
this
}
@@ -83,8 +95,18 @@ object ResultSetAction {
if (page > 0) resultSetAction.setParameter("page", page)
if (pageSize > 0) resultSetAction.setParameter("pageSize", pageSize)
resultSetAction.setParameter("charset", charset)
- resultSetAction.setParameter("enableLimit", enableLimit)
+ if (enableLimit.isDefined) resultSetAction.setParameter("enableLimit",
true)
resultSetAction.setParameter("nullValue", nullValue)
+ if (columnPage > 0) {
+ resultSetAction.setParameter("columnPage", columnPage)
+ } else {
+ resultSetAction.setParameter("columnPage", null)
+ }
+ if (columnPageSize > 0) {
+ resultSetAction.setParameter("columnPageSize", columnPageSize)
+ } else {
+ resultSetAction.setParameter("columnPageSize", null)
+ }
resultSetAction.setUser(user)
resultSetAction
}
diff --git
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/utils/UJESClientUtils.scala
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/utils/UJESClientUtils.scala
index 28f4b46b9b..e75929ea8f 100644
---
a/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/utils/UJESClientUtils.scala
+++
b/linkis-computation-governance/linkis-client/linkis-computation-client/src/main/scala/org/apache/linkis/ujes/client/utils/UJESClientUtils.scala
@@ -19,6 +19,7 @@ package org.apache.linkis.ujes.client.utils
import org.apache.linkis.ujes.client.exception.UJESClientBuilderException
import org.apache.linkis.ujes.client.request.JobExecuteAction.{EngineType,
RunType}
+import org.apache.linkis.ujes.client.response.ResultSetResult
import java.util
import java.util.Locale
@@ -27,6 +28,8 @@ import com.google.gson.{Gson, JsonObject}
object UJESClientUtils {
+ val gson: Gson = new Gson()
+
def toEngineType(engineType: String): EngineType = engineType match {
case "spark" => EngineType.SPARK
case "hive" => EngineType.HIVE
@@ -71,13 +74,11 @@ object UJESClientUtils {
case "double" => value.toDouble
case "boolean" => value.toBoolean
case "byte" => value.toByte
- case "timestamp" => value
- case "date" => value
case "bigint" => value.toLong
case "decimal" => value.toDouble
- case "array" => new Gson().fromJson(value,
classOf[util.ArrayList[Object]])
- case "map" => new Gson().fromJson(value, classOf[util.HashMap[Object,
Object]])
- case "struct" => new Gson().fromJson(value, classOf[JsonObject])
+ case "array" => gson.fromJson(value, classOf[util.ArrayList[Object]])
+ case "map" => gson.fromJson(value, classOf[util.HashMap[Object,
Object]])
+ case "struct" => gson.fromJson(value, classOf[JsonObject])
case _ => value
}
}
diff --git
a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java
b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java
index dd12d1414c..0bc0b08c52 100644
---
a/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java
+++
b/linkis-computation-governance/linkis-jdbc-driver/src/main/java/org/apache/linkis/ujes/jdbc/UJESSQLDriver.java
@@ -50,6 +50,7 @@ public class UJESSQLDriver extends UJESSQLDriverMain
implements Driver {
static String PASSWORD = "password";
static boolean TABLEAU_SERVER = false;
static String FIXED_SESSION = "fixedSession";
+ static String ENABLE_MULTI_RESULT = "enableMultiResult";
static String USE_SSL = "useSSL";
static String VERSION = "version";
diff --git
a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
index f75db82cdf..0be96b2c15 100644
---
a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
+++
b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
@@ -448,8 +448,6 @@ class LinkisSQLConnection(private[jdbc] val ujesClient:
UJESClient, props: Prope
val runType = EngineType.mapStringToEngineType(engine) match {
case EngineType.SPARK => RunType.SQL
case EngineType.HIVE => RunType.HIVE
- case EngineType.REPL => RunType.REPL
- case EngineType.DORIS => RunType.DORIS
case EngineType.TRINO => RunType.TRINO_SQL
case EngineType.PRESTO => RunType.PRESTO_SQL
case EngineType.NEBULA => RunType.NEBULA_SQL
diff --git
a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLStatement.scala
b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLStatement.scala
index f00d870978..e3a1475d2b 100644
---
a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLStatement.scala
+++
b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLStatement.scala
@@ -37,6 +37,10 @@ class LinkisSQLStatement(private[jdbc] val
ujesSQLConnection: LinkisSQLConnectio
with Logging {
private var jobExecuteResult: JobExecuteResult = _
+
+ private val openedResultSets: util.ArrayList[UJESSQLResultSet] =
+ new util.ArrayList[UJESSQLResultSet]()
+
private var resultSet: UJESSQLResultSet = _
private var closed = false
private var maxRows: Int = 0
@@ -190,7 +194,7 @@ class LinkisSQLStatement(private[jdbc] val
ujesSQLConnection: LinkisSQLConnectio
override def getUpdateCount: Int = throwWhenClosed(-1)
- override def getMoreResults: Boolean = false
+ override def getMoreResults: Boolean =
getMoreResults(Statement.CLOSE_CURRENT_RESULT)
override def setFetchDirection(direction: Int): Unit =
throwWhenClosed(if (direction != ResultSet.FETCH_FORWARD) {
@@ -230,7 +234,45 @@ class LinkisSQLStatement(private[jdbc] val
ujesSQLConnection: LinkisSQLConnectio
override def getConnection: Connection = throwWhenClosed(ujesSQLConnection)
- override def getMoreResults(current: Int): Boolean = false
+ override def getMoreResults(current: Int): Boolean = {
+ if (this.resultSet == null) {
+ false
+ } else {
+ this.resultSet.getMetaData
+ val nextResultSet = this.resultSet.getNextResultSet
+ current match {
+ case Statement.CLOSE_CURRENT_RESULT =>
+ // 1 - CLOSE CURRENT RESULT SET
+ this.resultSet.close()
+ this.resultSet.clearNextResultSet
+ case Statement.KEEP_CURRENT_RESULT =>
+ // 2 - KEEP CURRENT RESULT SET
+ this.openedResultSets.add(this.resultSet)
+ this.resultSet.clearNextResultSet
+ case Statement.CLOSE_ALL_RESULTS =>
+ // 3 - CLOSE ALL RESULT SET
+ this.openedResultSets.add(this.resultSet)
+ closeAllOpenedResultSet()
+ case _ =>
+ throw new LinkisSQLException(
+ LinkisSQLErrorCode.NOSUPPORT_STATEMENT,
+ "getMoreResults with current not in 1,2,3 is not supported, see
Statement.getMoreResults"
+ )
+ }
+ this.resultSet = nextResultSet
+ this.resultSet != null
+ }
+ }
+
+ private def closeAllOpenedResultSet(): Any = {
+ val iterator = this.openedResultSets.iterator()
+ while (iterator.hasNext) {
+ val set = iterator.next()
+ if (!set.isClosed) {
+ set.close()
+ }
+ }
+ }
override def getGeneratedKeys: ResultSet = throw new LinkisSQLException(
LinkisSQLErrorCode.NOSUPPORT_STATEMENT,
@@ -302,6 +344,7 @@ class LinkisSQLStatement(private[jdbc] val
ujesSQLConnection: LinkisSQLConnectio
/**
* log[0] error log[1] warn log[2] info log[3] all (info + warn + error)
+ *
* @return
*/
def getAllLog(): Array[String] = {
@@ -316,6 +359,7 @@ class LinkisSQLStatement(private[jdbc] val
ujesSQLConnection: LinkisSQLConnectio
/**
* log[0] error log[1] warn log[2] info log[3] all (info + warn + error)
+ *
* @return
*/
def getIncrementalLog(): util.List[String] = {
diff --git
a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala
b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala
index c162b8c924..44686981e8 100644
---
a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala
+++
b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLDriverMain.scala
@@ -78,6 +78,9 @@ class UJESSQLDriverMain extends Driver with Logging {
case Array(USE_SSL, value) =>
props.setProperty(USE_SSL, value)
false
+ case Array(ENABLE_MULTI_RESULT, value) =>
+ props.setProperty(ENABLE_MULTI_RESULT, value)
+ false
case Array(key, _) =>
if (StringUtils.isBlank(key)) {
throw new LinkisSQLException(
@@ -141,6 +144,7 @@ object UJESSQLDriverMain {
val PASSWORD = UJESSQLDriver.PASSWORD
val TABLEAU_SERVER = UJESSQLDriver.TABLEAU_SERVER
val FIXED_SESSION = UJESSQLDriver.FIXED_SESSION
+ val ENABLE_MULTI_RESULT = UJESSQLDriver.ENABLE_MULTI_RESULT
val USE_SSL = UJESSQLDriver.USE_SSL
diff --git
a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala
b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala
index 39418a42d1..02e8551722 100644
---
a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala
+++
b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/UJESSQLResultSet.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.ujes.jdbc
import org.apache.linkis.common.utils.Logging
import org.apache.linkis.ujes.client.request.ResultSetAction
import org.apache.linkis.ujes.client.response.ResultSetResult
+import org.apache.linkis.ujes.client.utils.UJESClientUtils
import org.apache.commons.lang3.StringUtils
@@ -76,6 +77,7 @@ class UJESSQLResultSet(
private var path: String = _
private var metaData: util.List[util.Map[String, String]] = _
private val statement: LinkisSQLStatement = ujesStatement
+ private var nextResultSet: UJESSQLResultSet = _
private val connection: LinkisSQLConnection =
ujesStatement.getConnection.asInstanceOf[LinkisSQLConnection]
@@ -102,7 +104,15 @@ class UJESSQLResultSet(
private def getResultSetPath(resultSetList: Array[String]): String = {
if (resultSetList.length > 0) {
- resultSetList(resultSetList.length - 1)
+ val enableMultiResult =
connection.getProps.getProperty(UJESSQLDriverMain.ENABLE_MULTI_RESULT)
+ enableMultiResult match {
+ case "Y" =>
+ // 配置开启时,返回首个结果集
+ resultSetList(0)
+ case _ =>
+ // 配置关闭时,返回以最后一个结果集为准
+ resultSetList(resultSetList.length - 1)
+ }
} else {
""
}
@@ -110,6 +120,12 @@ class UJESSQLResultSet(
private def resultSetResultInit(): Unit = {
if (path == null) path = getResultSetPath(resultSetList)
+ // 设置下一个结果集
+ val enableMultiResult =
connection.getProps.getProperty(UJESSQLDriverMain.ENABLE_MULTI_RESULT)
+ if (resultSetList.length > 1 && "Y".equals(enableMultiResult)) {
+ this.nextResultSet =
+ new UJESSQLResultSet(resultSetList.drop(1), this.statement, maxRows,
fetchSize)
+ }
val user = connection.getProps.getProperty("user")
if (StringUtils.isNotBlank(path)) {
val resultAction =
@@ -235,38 +251,7 @@ class UJESSQLResultSet(
}
private def evaluate(dataType: String, value: String): Any = {
-
- if (value == null || value.equals("null") || value.equals("NULL") ||
value.equals("Null")) {
- dataType.toLowerCase(Locale.getDefault) match {
- case "string" | "char" | "varchar" | "nvarchar" => value
- case _ => null
- }
- } else {
- dataType.toLowerCase(Locale.getDefault) match {
- case null => throw new
LinkisSQLException(LinkisSQLErrorCode.METADATA_EMPTY)
- case "char" | "varchar" | "nvarchar" | "string" => value
- case "short" => value.toShort
- case "smallint" => value.toShort
- case "tinyint" => value.toShort
- case "int" => value.toInt
- case "long" => value.toLong
- case "float" => value.toFloat
- case "double" => value.toDouble
- case "boolean" => value.toBoolean
- case "byte" => value.toByte
- case "timestamp" => value
- case "date" => value
- case "bigint" => value.toLong
- case "decimal" => value.toDouble
- case "array" => value.toArray
- case "map" => value
- case _ =>
- throw new LinkisSQLException(
- LinkisSQLErrorCode.PREPARESTATEMENT_TYPEERROR,
- s"Can't infer the SQL type to use for an instance of ${dataType}.
Use getObject() with an explicit Types value to specify the type to use"
- )
- }
- }
+ UJESClientUtils.evaluate(dataType, value)
}
private def getColumnValue(columnIndex: Int): Any = {
@@ -303,6 +288,10 @@ class UJESSQLResultSet(
}
}
+ def clearNextResultSet: Any = {
+ this.nextResultSet = null
+ }
+
override def getBoolean(columnIndex: Int): Boolean = {
val any = getColumnValue(columnIndex)
if (wasNull()) {
@@ -683,6 +672,8 @@ class UJESSQLResultSet(
true
}
+ def getNextResultSet: UJESSQLResultSet = this.nextResultSet
+
override def setFetchDirection(direction: Int): Unit = {
throw new LinkisSQLException(LinkisSQLErrorCode.NOSUPPORT_RESULTSET)
}
diff --git
a/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/LinkisSQLStatementTest.java
b/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/LinkisSQLStatementTest.java
index 3ebd21ae70..e319cd0254 100644
---
a/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/LinkisSQLStatementTest.java
+++
b/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/LinkisSQLStatementTest.java
@@ -17,16 +17,26 @@
package org.apache.linkis.ujes.jdbc;
+import org.apache.linkis.governance.common.entity.ExecutionNodeStatus;
+import org.apache.linkis.governance.common.entity.task.RequestPersistTask;
+import org.apache.linkis.ujes.client.UJESClient;
+import org.apache.linkis.ujes.client.response.JobExecuteResult;
+import org.apache.linkis.ujes.client.response.JobInfoResult;
+import org.apache.linkis.ujes.client.response.ResultSetResult;
+
import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
/*
* Notice:
@@ -143,6 +153,184 @@ public class LinkisSQLStatementTest {
}
}
+ /**
+ * single query without next result set check point 1: getMoreResults
returns false check point 2:
+ * default getMoreResults, use Statement.CLOSE_CURRENT_RESULT. The current
result set is closed.
+ */
+ @Test
+ public void singleQueryWithNoMoreResult() {
+ Properties t = new Properties();
+ t.put("user", "hiveUser");
+ UJESClient ujesClient = Mockito.mock(UJESClient.class);
+ LinkisSQLConnection linkisSQLConnection = Mockito.spy(new
LinkisSQLConnection(ujesClient, t));
+ LinkisSQLStatement linkisSQLStatement = new
LinkisSQLStatement(linkisSQLConnection);
+ Mockito.when(ujesClient.resultSet(any())).thenReturn(new
ResultSetResult());
+
+ JobExecuteResult jobExecuteResult = new JobExecuteResult();
+
Mockito.doReturn(jobExecuteResult).when(linkisSQLConnection).toSubmit(anyString());
+ JobInfoResult jobInfoResult = Mockito.spy(new JobInfoResult());
+
Mockito.when(ujesClient.getJobInfo(jobExecuteResult)).thenReturn(jobInfoResult);
+
Mockito.doReturn(ExecutionNodeStatus.Succeed.name()).when(jobInfoResult).getJobStatus();
+ Mockito.doReturn(new
RequestPersistTask()).when(jobInfoResult).getRequestPersistTask();
+
+ Mockito.doReturn(new String[] {"path
1"}).when(jobInfoResult).getResultSetList(ujesClient);
+
+ linkisSQLStatement.execute("select 1");
+ UJESSQLResultSet resultSet = linkisSQLStatement.getResultSet();
+ assertNotNull(resultSet);
+ assertFalse(resultSet.isClosed());
+ // it will close current result set with default value 1
+ boolean moreResults = linkisSQLStatement.getMoreResults();
+ assertFalse(moreResults);
+ assertTrue(resultSet.isClosed());
+ }
+
+ /**
+ * multiple query without multiple result param, return one result check
point 1: 2 sql executed.
+ * 1 result set
+ */
+ @Test
+ public void multiQueryWithNoMoreResult() {
+ Properties t = new Properties();
+ t.put("user", "hiveUser");
+ UJESClient ujesClient = Mockito.mock(UJESClient.class);
+ LinkisSQLConnection linkisSQLConnection = Mockito.spy(new
LinkisSQLConnection(ujesClient, t));
+ LinkisSQLStatement linkisSQLStatement = new
LinkisSQLStatement(linkisSQLConnection);
+ Mockito.when(ujesClient.resultSet(any())).thenReturn(new
ResultSetResult());
+ JobExecuteResult jobExecuteResult = new JobExecuteResult();
+
Mockito.doReturn(jobExecuteResult).when(linkisSQLConnection).toSubmit(anyString());
+ JobInfoResult jobInfoResult = Mockito.spy(new JobInfoResult());
+
Mockito.when(ujesClient.getJobInfo(jobExecuteResult)).thenReturn(jobInfoResult);
+
Mockito.doReturn(ExecutionNodeStatus.Succeed.name()).when(jobInfoResult).getJobStatus();
+ Mockito.doReturn(new
RequestPersistTask()).when(jobInfoResult).getRequestPersistTask();
+
+ Mockito.doReturn(new String[] {"path 1", "path 2"})
+ .when(jobInfoResult)
+ .getResultSetList(ujesClient);
+
+ linkisSQLStatement.execute("select 1;select 2;");
+ UJESSQLResultSet resultSet = linkisSQLStatement.getResultSet();
+ assertNotNull(resultSet);
+ assertFalse(resultSet.isClosed());
+ // it will close current result set with default value 1
+ boolean moreResults = linkisSQLStatement.getMoreResults();
+ assertFalse(moreResults);
+ assertTrue(resultSet.isClosed());
+ }
+
+ /**
+ * multiple query executed with multiple result param is Y check point 1:
getMoreResults returns
+ * true check point 2: current result is closed check point 3: second
getMoreResults returns false
+ */
+ @Test
+ public void multiQueryWithMoreResult() {
+ Properties t = new Properties();
+ t.put("user", "hiveUser");
+ t.put(UJESSQLDriverMain.ENABLE_MULTI_RESULT(), "Y");
+ UJESClient ujesClient = Mockito.mock(UJESClient.class);
+ LinkisSQLConnection linkisSQLConnection = Mockito.spy(new
LinkisSQLConnection(ujesClient, t));
+ LinkisSQLStatement linkisSQLStatement = new
LinkisSQLStatement(linkisSQLConnection);
+ Mockito.when(ujesClient.resultSet(any())).thenReturn(new
ResultSetResult());
+
+ JobExecuteResult jobExecuteResult = new JobExecuteResult();
+
Mockito.doReturn(jobExecuteResult).when(linkisSQLConnection).toSubmit(anyString());
+ JobInfoResult jobInfoResult = Mockito.spy(new JobInfoResult());
+
Mockito.when(ujesClient.getJobInfo(jobExecuteResult)).thenReturn(jobInfoResult);
+
Mockito.doReturn(ExecutionNodeStatus.Succeed.name()).when(jobInfoResult).getJobStatus();
+ Mockito.doReturn(new
RequestPersistTask()).when(jobInfoResult).getRequestPersistTask();
+
+ Mockito.doReturn(new String[] {"path 1", "path 2"})
+ .when(jobInfoResult)
+ .getResultSetList(ujesClient);
+
+ linkisSQLStatement.execute("select 1;select 2;");
+ UJESSQLResultSet resultSet = linkisSQLStatement.getResultSet();
+ assertNotNull(resultSet);
+ assertFalse(resultSet.isClosed());
+ // it will close current result set with default value 1
+ boolean moreResults = linkisSQLStatement.getMoreResults();
+ assertTrue(moreResults);
+ assertTrue(resultSet.isClosed());
+ moreResults = linkisSQLStatement.getMoreResults();
+ assertFalse(moreResults);
+ }
+
+ /**
+ * multiple query executed with multiple result param is Y, and use
+ * LinkisSQLStatement.KEEP_CURRENT_RESULT check point 1: getMoreResults
returns true check point
+ * 2: current result is not close check point 3: second getMoreResults
returns false
+ */
+ @Test
+ public void multiQueryWithMoreResultNotCloseCurrent() {
+ Properties t = new Properties();
+ t.put("user", "hiveUser");
+ t.put(UJESSQLDriverMain.ENABLE_MULTI_RESULT(), "Y");
+ UJESClient ujesClient = Mockito.mock(UJESClient.class);
+ LinkisSQLConnection linkisSQLConnection = Mockito.spy(new
LinkisSQLConnection(ujesClient, t));
+ LinkisSQLStatement linkisSQLStatement = new
LinkisSQLStatement(linkisSQLConnection);
+ Mockito.when(ujesClient.resultSet(any())).thenReturn(new
ResultSetResult());
+
+ JobExecuteResult jobExecuteResult = new JobExecuteResult();
+
Mockito.doReturn(jobExecuteResult).when(linkisSQLConnection).toSubmit(anyString());
+ JobInfoResult jobInfoResult = Mockito.spy(new JobInfoResult());
+
Mockito.when(ujesClient.getJobInfo(jobExecuteResult)).thenReturn(jobInfoResult);
+
Mockito.doReturn(ExecutionNodeStatus.Succeed.name()).when(jobInfoResult).getJobStatus();
+ Mockito.doReturn(new
RequestPersistTask()).when(jobInfoResult).getRequestPersistTask();
+
+ Mockito.doReturn(new String[] {"path 1", "path 2"})
+ .when(jobInfoResult)
+ .getResultSetList(ujesClient);
+
+ linkisSQLStatement.execute("select 1;select 2;");
+ UJESSQLResultSet resultSet = linkisSQLStatement.getResultSet();
+ assertNotNull(resultSet);
+ assertFalse(resultSet.isClosed());
+ // it will close current result set with default value 1
+ boolean moreResults =
linkisSQLStatement.getMoreResults(LinkisSQLStatement.KEEP_CURRENT_RESULT);
+ assertTrue(moreResults);
+ assertFalse(resultSet.isClosed());
+ }
+
+ /**
+ * multiple query executed with multiple result param is Y, and use
+ * LinkisSQLStatement.CLOSE_ALL_RESULTS check point 1: getMoreResults
returns true check point 2:
+ * current result is not close check point 3: second getMoreResults returns
false check point 4:
+ * first result set is closed after second invoke getMoreResults
+ */
+ @Test
+ public void multiQueryWithMoreResultCloseAllOpenedCurrent() {
+ Properties t = new Properties();
+ t.put("user", "hiveUser");
+ t.put(UJESSQLDriverMain.ENABLE_MULTI_RESULT(), "Y");
+ UJESClient ujesClient = Mockito.mock(UJESClient.class);
+ LinkisSQLConnection linkisSQLConnection = Mockito.spy(new
LinkisSQLConnection(ujesClient, t));
+ LinkisSQLStatement linkisSQLStatement = new
LinkisSQLStatement(linkisSQLConnection);
+ Mockito.when(ujesClient.resultSet(any())).thenReturn(new
ResultSetResult());
+
+ JobExecuteResult jobExecuteResult = new JobExecuteResult();
+
Mockito.doReturn(jobExecuteResult).when(linkisSQLConnection).toSubmit(anyString());
+ JobInfoResult jobInfoResult = Mockito.spy(new JobInfoResult());
+
Mockito.when(ujesClient.getJobInfo(jobExecuteResult)).thenReturn(jobInfoResult);
+
Mockito.doReturn(ExecutionNodeStatus.Succeed.name()).when(jobInfoResult).getJobStatus();
+ Mockito.doReturn(new
RequestPersistTask()).when(jobInfoResult).getRequestPersistTask();
+
+ Mockito.doReturn(new String[] {"path 1", "path 2"})
+ .when(jobInfoResult)
+ .getResultSetList(ujesClient);
+
+ linkisSQLStatement.execute("select 1;select 2;");
+ UJESSQLResultSet resultSet = linkisSQLStatement.getResultSet();
+ assertNotNull(resultSet);
+ assertFalse(resultSet.isClosed());
+ // it will close current result set with default value 1
+ boolean moreResults =
linkisSQLStatement.getMoreResults(Statement.KEEP_CURRENT_RESULT);
+ assertTrue(moreResults);
+ assertFalse(resultSet.isClosed());
+ moreResults =
linkisSQLStatement.getMoreResults(Statement.CLOSE_ALL_RESULTS);
+ assertFalse(moreResults);
+ assertTrue(resultSet.isClosed());
+ }
+
@AfterAll
public static void closeStateAndConn() {
if (statement != null) {
diff --git
a/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/UJESSQLResultSetTest.java
b/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/UJESSQLResultSetTest.java
index a8f0a179d0..c0631427ea 100644
---
a/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/UJESSQLResultSetTest.java
+++
b/linkis-computation-governance/linkis-jdbc-driver/src/test/java/org/apache/linkis/ujes/jdbc/UJESSQLResultSetTest.java
@@ -17,7 +17,14 @@
package org.apache.linkis.ujes.jdbc;
+import org.apache.linkis.ujes.client.UJESClient;
+import org.apache.linkis.ujes.client.request.ResultSetAction;
+import org.apache.linkis.ujes.client.response.ResultSetResult;
+
import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -25,6 +32,10 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
/*
* Notice:
@@ -137,4 +148,101 @@ public class UJESSQLResultSetTest {
Assertions.assertTrue(resultSet.isAfterLast());
}
}
+
+ /** single query result with no multiple result set check point 1:
nextResultSet is null */
+ @Test
+ public void singleQueryWithNoMoreResultSet() {
+ Properties t = new Properties();
+ t.put("user", "hiveUser");
+ UJESClient ujesClient = Mockito.mock(UJESClient.class);
+ Mockito.when(ujesClient.resultSet(any())).thenReturn(new
ResultSetResult());
+
+ LinkisSQLConnection linkisSQLConnection = new
LinkisSQLConnection(ujesClient, t);
+
+ UJESSQLResultSet ujessqlResultSet =
+ new UJESSQLResultSet(
+ new String[] {"path1"}, new
LinkisSQLStatement(linkisSQLConnection), 0, 0);
+
+ ujessqlResultSet.next();
+
+ assertNull(ujessqlResultSet.getNextResultSet());
+ }
+
+ /**
+ * multiple result set with multi result switch is Y check point 1:
queryResult has two path,
+ * return first path. check point 2: the second result set returned check
point 3: the third
+ * result set is null
+ */
+ @Test
+ public void nultiQueryWithMoreResultSet() {
+ Properties t = new Properties();
+ t.put("user", "hiveUser");
+ t.put(UJESSQLDriverMain.ENABLE_MULTI_RESULT(), "Y");
+ UJESClient ujesClient = Mockito.mock(UJESClient.class);
+ List<String> pathList = new ArrayList<>();
+ Mockito.when(ujesClient.resultSet(any()))
+ .thenAnswer(
+ invocationOnMock -> {
+ ResultSetAction argument = invocationOnMock.getArgument(0);
+ String path = (String) argument.getParameters().get("path");
+ if (pathList.isEmpty()) {
+ assertEquals("path1", path);
+ }
+ pathList.add(path);
+
+ return new ResultSetResult();
+ });
+ LinkisSQLConnection linkisSQLConnection = new
LinkisSQLConnection(ujesClient, t);
+
+ UJESSQLResultSet ujessqlResultSet =
+ new UJESSQLResultSet(
+ new String[] {"path1", "path2"}, new
LinkisSQLStatement(linkisSQLConnection), 0, 0);
+
+ // 查询
+ ujessqlResultSet.next();
+
+ // 存在下一个结果集
+ UJESSQLResultSet nextResultSet = ujessqlResultSet.getNextResultSet();
+ assertNotNull(nextResultSet);
+ nextResultSet.next();
+
+ // 不存在第三个结果集
+ assertNull(nextResultSet.getNextResultSet());
+ }
+
+ /**
+ * multiple result set with multi result switch not Y check point 1:
queryResult has two path,
+ * return last path. check point 2: the next result set is null
+ */
+ @Test
+ public void nultiQueryWithNoMoreResultSet() {
+ Properties t = new Properties();
+ t.put("user", "hiveUser");
+ UJESClient ujesClient = Mockito.mock(UJESClient.class);
+ Mockito.when(ujesClient.resultSet(any()))
+ .thenAnswer(
+ invocationOnMock -> {
+ ResultSetAction argument = invocationOnMock.getArgument(0);
+ String path = (String) argument.getParameters().get("path");
+ assertEquals("path4", path);
+
+ return new ResultSetResult();
+ });
+
+ LinkisSQLConnection linkisSQLConnection = new
LinkisSQLConnection(ujesClient, t);
+
+ UJESSQLResultSet ujessqlResultSet =
+ new UJESSQLResultSet(
+ new String[] {"path1", "path2", "path3", "path4"},
+ new LinkisSQLStatement(linkisSQLConnection),
+ 0,
+ 0);
+
+ // 查询
+ ujessqlResultSet.next();
+
+ // 即使查询有多个结果集,也不会产生多个结果集返回
+ UJESSQLResultSet nextResultSet = ujessqlResultSet.getNextResultSet();
+ assertNull(nextResultSet);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]