This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new b315123a6 [KYUUBI #1652] Support Flink yarn application mode
b315123a6 is described below
commit b315123a6b6dfa7b03a5ab7875856bdfd4e0eaed
Author: Paul Lin <[email protected]>
AuthorDate: Fri Apr 7 18:51:48 2023 +0800
[KYUUBI #1652] Support Flink yarn application mode
### _Why are the changes needed?_
Flink yarn application mode is crucial for the production usage of Flink
engine.
To test this PR locally, we should:
1) set `flink.execution.target=yarn-application` in `kyuubi-defaults.conf`.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4604 from link3280/KYUUBI-1652.
Closes #1652
49b454f1e [Paul Lin] [KYUUBI #1652] Delay access to thrift services to
stablize tests
b91b64bf6 [Paul Lin] Revert "[KYUUBI #1652] Avoid hadoop conf injecting
into kyuubi conf"
c9f710b0f [Paul Lin] [KYUUBI #1652] Avoid hadoop conf injecting into kyuubi
conf
cde8a5477 [Paul Lin] [KYUUBI #1652] Improve docs
edba0ec79 [Paul Lin] [KYUUBI #1652] Improve codestyle
e03e055ae [Paul Lin] [KYUUBI #1652] Update docs according to the comments
490559cd8 [Paul Lin] [KYUUBI #1652] Update docs
769d1a8fa [Paul Lin] [KYUUBI #1652] Move zookeeper to test scope
bafb3f5a4 [Paul Lin] [KYUUBI #1652] Fix flink-it test
dd40c72b8 [Paul Lin] [KYUUBI #1652] Update docs
36c993fc2 [Paul Lin] [KYUUBI #1652] Fix javax.activation not found in
flink-it
2a751bdd6 [Paul Lin] [KYUUBI #1652] Introduce EmbeddedZookeeper in Flink
yarn tests
0933b7082 [Paul Lin] [KYUUBI #1652] Fix spotless issue
b858f7df6 [Paul Lin] [KYUUBI #1652] Fix Flink submit timeout because
failing to find hadoop conf
15801b598 [Paul Lin] [KYUUBI #1652] Replace unused jaxb
b210615e4 [Paul Lin] Update externals/kyuubi-flink-sql-engine/pom.xml
24b23da2c [Paul Lin] [KYUUBI #1652] Update jaxb scope to test
240efae1a [Paul Lin] [KYUUBI #1652] Update jaxb scope to runtime
0e9a508b6 [Paul Lin] [KYUUBI #1652] Update jaxb scope to runtime
b5dbd3346 [Paul Lin] [KYUUBI #1652] Fix jdk11 jaxb ClassNotFoundException
72ba3ee6d [Paul Lin] [KYUUBI #1652] Update tm memory to 1gb
4e10ea21f [Paul Lin] [KYUUBI #1652] Refactor flink engin tests
e9cec4a65 [Paul Lin] [KYUUBI #1652] Add flink-it tests
6eb9fd3ad [Paul Lin] [KYUUBI #1652] Fix ProcessBuilder tests
6aca061e6 [Paul Lin] [KYUUBI #1652] Fix ClassNotFoundException
7581a2a0d [Paul Lin] [KYUUBI #1652] Fix missing minicluster
412c34571 [Paul Lin] [KYUUBI #1652] Remove flink-yarn dependencies
0eafbd7b0 [Paul Lin] Update
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
ee2c64d04 [Paul Lin] [KYUUBI #1652] Add Flink YARN application tests
a72627393 [Paul Lin] [KYUUBI #1652] Avoid flink-yarn dependencies
a75cb2579 [Paul Lin] [KYUUBI #1652] Fix test issue
b7e173f30 [Paul Lin] [KYUUBI #1652] Replace file-based Kyuubi conf with cli
args
693ad6529 [Paul Lin] [KYUUBI #1652] Removed unused imports
68e0081e1 [Paul Lin] Update
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
ba021de9d [Paul Lin] [KYUUBI #1652] Search flink-sql.* jars and add them to
pipeline jars
0846babbd [Paul Lin] [KYUUBI #1652] Avoid Scala bug
56413fe83 [Paul Lin] [KYUUBI #1652] Improve tmp files cleanup
8bdb672c4 [Paul Lin] [KYUUBI #1652] Explicitly load Kyuubi conf on Flink
engine start
0b6325000 [Paul Lin] [KYUUBI #1652] Fix test failures
0ba03e439 [Paul Lin] [KYUUBI #1652] Fix wrong Flink args
00f036b04 [Paul Lin] [KYUUBI #1652] Remove unused util methods
dfd2777ac [Paul Lin] [KYUUBI ##1652] Support Flink yarn application mode
Authored-by: Paul Lin <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
docs/deployment/settings.md | 7 +-
externals/kyuubi-flink-sql-engine/pom.xml | 38 +++
.../executors/EmbeddedExecutorFactory.java | 125 ++++++++++
...he.flink.core.execution.PipelineExecutorFactory | 16 ++
.../kyuubi/engine/flink/FlinkSQLEngine.scala | 24 +-
.../engine/flink/operation/ExecuteStatement.scala | 9 +-
.../kyuubi/engine/flink/result/ResultSet.scala | 17 +-
.../engine/flink/WithDiscoveryFlinkSQLEngine.scala | 65 +++++
...LEngine.scala => WithFlinkSQLEngineLocal.scala} | 23 +-
.../engine/flink/WithFlinkSQLEngineOnYarn.scala | 265 +++++++++++++++++++++
.../engine/flink/WithFlinkTestResources.scala | 41 ++++
.../flink/operation/FlinkOperationLocalSuite.scala | 33 +++
.../operation/FlinkOperationOnYarnSuite.scala | 26 ++
.../flink/operation/FlinkOperationSuite.scala | 54 +++--
.../flink/operation/PlanOnlyOperationSuite.scala | 4 +-
integration-tests/kyuubi-flink-it/pom.xml | 31 +++
.../flink/WithKyuubiServerAndYarnMiniCluster.scala | 145 +++++++++++
.../operation/FlinkOperationSuiteOnYarn.scala | 113 +++++++++
.../org/apache/kyuubi/config/KyuubiConf.scala | 14 +-
.../apache/kyuubi/ha/client/ServiceDiscovery.scala | 1 +
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 2 +-
.../kyuubi/engine/KyuubiApplicationManager.scala | 8 +-
.../kyuubi/engine/flink/FlinkProcessBuilder.scala | 188 ++++++++++-----
.../engine/flink/FlinkProcessBuilderSuite.scala | 74 +++++-
.../org/apache/kyuubi/server/MiniYarnService.scala | 6 +-
25 files changed, 1190 insertions(+), 139 deletions(-)
diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 960f2c328..b12185c3c 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -136,9 +136,10 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.deregister.job.max.failures | 4
| Number of failures of job before deregistering the engine.
[...]
| kyuubi.engine.event.json.log.path |
file:///tmp/kyuubi/events | The location where all the engine events go for the
built-in JSON logger.<ul><li>Local Path: start with 'file://'</li><li>HDFS
Path: start with 'hdfs://'</li></ul>
[...]
| kyuubi.engine.event.loggers | SPARK
| A comma-separated list of engine history loggers, where
engine/session/operation etc events go.<ul> <li>SPARK: the events will be
written to the Spark listener bus.</li> <li>JSON: the events will be written to
the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be
done</li> <li>CUSTOM: User-defined event handlers.</li></ul> Note that: Kyuubi
supports custom event handlers with the Jav [...]
-| kyuubi.engine.flink.extra.classpath | <undefined>
| The extra classpath for the Flink SQL engine, for configuring the
location of hadoop client jars, etc
[...]
-| kyuubi.engine.flink.java.options | <undefined>
| The extra Java options for the Flink SQL engine
[...]
-| kyuubi.engine.flink.memory | 1g
| The heap memory for the Flink SQL engine
[...]
+| kyuubi.engine.flink.application.jars | <undefined>
| A comma-separated list of the local jars to be shipped with the job
to the cluster. For example, SQL UDF jars. Only effective in yarn application
mode.
[...]
+| kyuubi.engine.flink.extra.classpath | <undefined>
| The extra classpath for the Flink SQL engine, for configuring the
location of hadoop client jars, etc. Only effective in yarn session mode.
[...]
+| kyuubi.engine.flink.java.options | <undefined>
| The extra Java options for the Flink SQL engine. Only effective in
yarn session mode.
[...]
+| kyuubi.engine.flink.memory | 1g
| The heap memory for the Flink SQL engine. Only effective in yarn
session mode.
[...]
| kyuubi.engine.hive.event.loggers | JSON
| A comma-separated list of engine history loggers, where
engine/session/operation etc events go.<ul> <li>JSON: the events will be
written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to
be done</li> <li>CUSTOM: to be done.</li></ul>
[...]
| kyuubi.engine.hive.extra.classpath | <undefined>
| The extra classpath for the Hive query engine, for configuring
location of the hadoop client jars and etc.
[...]
| kyuubi.engine.hive.java.options | <undefined>
| The extra Java options for the Hive query engine
[...]
diff --git a/externals/kyuubi-flink-sql-engine/pom.xml
b/externals/kyuubi-flink-sql-engine/pom.xml
index f3633b904..0e499f978 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -126,11 +126,49 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-zookeeper_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-minicluster</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk15on</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>jakarta.activation</groupId>
+ <artifactId>jakarta.activation-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>jakarta.xml.bind</groupId>
+ <artifactId>jakarta.xml.bind-api</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
new file mode 100644
index 000000000..69d69a55c
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application.executors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.ClientOptions;
+import org.apache.flink.client.deployment.application.EmbeddedJobClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.PipelineExecutor;
+import org.apache.flink.core.execution.PipelineExecutorFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Copied from Apache Flink to exposed the DispatcherGateway for Kyuubi
statements. */
+@Internal
+public class EmbeddedExecutorFactory implements PipelineExecutorFactory {
+
+ private static Collection<JobID> bootstrapJobIds;
+
+ private static Collection<JobID> submittedJobIds;
+
+ private static DispatcherGateway dispatcherGateway;
+
+ private static ScheduledExecutor retryExecutor;
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(EmbeddedExecutorFactory.class);
+
+ public EmbeddedExecutorFactory() {
+ LOGGER.debug(
+ "{} loaded in thread {} with classloader {}.",
+ this.getClass().getCanonicalName(),
+ Thread.currentThread().getName(),
+ this.getClass().getClassLoader().toString());
+ }
+
+ /**
+ * Creates an {@link EmbeddedExecutorFactory}.
+ *
+ * @param submittedJobIds a list that is going to be filled with the job ids
of the new jobs that
+ * will be submitted. This is essentially used to return the submitted
job ids to the caller.
+ * @param dispatcherGateway the dispatcher of the cluster which is going to
be used to submit
+ * jobs.
+ */
+ public EmbeddedExecutorFactory(
+ final Collection<JobID> submittedJobIds,
+ final DispatcherGateway dispatcherGateway,
+ final ScheduledExecutor retryExecutor) {
+ // there should be only one instance of EmbeddedExecutorFactory
+ LOGGER.debug(
+ "{} initiated in thread {} with classloader {}.",
+ this.getClass().getCanonicalName(),
+ Thread.currentThread().getName(),
+ this.getClass().getClassLoader().toString());
+ checkState(EmbeddedExecutorFactory.submittedJobIds == null);
+ checkState(EmbeddedExecutorFactory.dispatcherGateway == null);
+ checkState(EmbeddedExecutorFactory.retryExecutor == null);
+ // submittedJobIds would be always 1, because we create a new list to
avoid concurrent access
+ // issues
+ EmbeddedExecutorFactory.submittedJobIds =
+ new ConcurrentLinkedQueue<>(checkNotNull(submittedJobIds));
+ EmbeddedExecutorFactory.bootstrapJobIds = submittedJobIds;
+ EmbeddedExecutorFactory.dispatcherGateway =
checkNotNull(dispatcherGateway);
+ EmbeddedExecutorFactory.retryExecutor = checkNotNull(retryExecutor);
+ }
+
+ @Override
+ public String getName() {
+ return EmbeddedExecutor.NAME;
+ }
+
+ @Override
+ public boolean isCompatibleWith(final Configuration configuration) {
+ // override Flink's implementation to allow usage in Kyuubi
+ LOGGER.debug("matching execution target: {}",
configuration.get(DeploymentOptions.TARGET));
+ return
configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase("yarn-application")
+ && configuration.toMap().getOrDefault("yarn.tags",
"").toLowerCase().contains("kyuubi");
+ }
+
+ @Override
+ public PipelineExecutor getExecutor(final Configuration configuration) {
+ checkNotNull(configuration);
+ Collection<JobID> executorJobIDs;
+ if (bootstrapJobIds.size() > 0) {
+ LOGGER.info("Submitting new Kyuubi job. Job already submitted: {}.",
submittedJobIds.size());
+ executorJobIDs = submittedJobIds;
+ } else {
+ LOGGER.info("Bootstrapping Flink SQL engine.");
+ executorJobIDs = bootstrapJobIds;
+ }
+ return new EmbeddedExecutor(
+ executorJobIDs,
+ dispatcherGateway,
+ (jobId, userCodeClassloader) -> {
+ final Time timeout =
+
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
+ return new EmbeddedJobClient(
+ jobId, dispatcherGateway, retryExecutor, timeout,
userCodeClassloader);
+ });
+ }
+}
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/resources/META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
b/externals/kyuubi-flink-sql-engine/src/main/resources/META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
new file mode 100644
index 000000000..c394c07a7
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/main/resources/META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.application.executors.EmbeddedExecutorFactory
\ No newline at end of file
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 06fdc65ae..42061a369 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -28,6 +28,7 @@ import scala.collection.mutable.ListBuffer
import org.apache.flink.client.cli.{DefaultCLI, GenericCLI}
import org.apache.flink.configuration.{Configuration, DeploymentOptions,
GlobalConfiguration}
+import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.client.SqlClientException
import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.util.JarUtils
@@ -71,9 +72,12 @@ object FlinkSQLEngine extends Logging {
def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)
+ info(s"Flink SQL engine classpath:
${System.getProperty("java.class.path")}")
+
FlinkEngineUtils.checkFlinkVersion()
try {
+ kyuubiConf.loadFileDefaults()
Utils.fromCommandLineArgs(args, kyuubiConf)
val flinkConfDir = sys.env.getOrElse(
"FLINK_CONF_DIR", {
@@ -100,6 +104,11 @@ object FlinkSQLEngine extends Logging {
val appName = s"kyuubi_${user}_flink_${Instant.now}"
flinkConf.setString("yarn.application.name", appName)
}
+ if (flinkConf.containsKey("high-availability.cluster-id")) {
+ flinkConf.setString(
+ "yarn.application.id",
+ flinkConf.toMap.get("high-availability.cluster-id"))
+ }
case "kubernetes-application" =>
if (!flinkConf.containsKey("kubernetes.cluster-id")) {
val appName = s"kyuubi-${user}-flink-${Instant.now}"
@@ -122,7 +131,11 @@ object FlinkSQLEngine extends Logging {
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
startEngine(engineContext)
- info("started engine...")
+ info("Flink engine started")
+
+ if ("yarn-application".equalsIgnoreCase(executionTarget)) {
+ bootstrapFlinkApplicationExecutor(flinkConf)
+ }
// blocking main thread
countDownLatch.await()
@@ -146,6 +159,15 @@ object FlinkSQLEngine extends Logging {
}
}
+ private def bootstrapFlinkApplicationExecutor(flinkConf: Configuration) = {
+ // trigger an execution to initiate EmbeddedExecutor
+ info("Running initial Flink SQL in application mode.")
+ val tableEnv = TableEnvironment.create(flinkConf)
+ val res = tableEnv.executeSql("select 'kyuubi'")
+ res.await()
+ info("Initial Flink SQL finished.")
+ }
+
private def discoverDependencies(
jars: Seq[URL],
libraries: Seq[URL]): List[URL] = {
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index de104150f..10ad5bf6d 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -28,7 +28,7 @@ import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.client.gateway.TypedResult
import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
-import org.apache.flink.table.operations.{Operation, QueryOperation}
+import org.apache.flink.table.operations.{ModifyOperation, Operation,
QueryOperation}
import org.apache.flink.table.operations.command._
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.logical._
@@ -80,6 +80,7 @@ class ExecuteStatement(
val operation = executor.parseStatement(sessionId, statement)
operation match {
case queryOperation: QueryOperation =>
runQueryOperation(queryOperation)
+ case modifyOperation: ModifyOperation =>
runModifyOperation(modifyOperation)
case setOperation: SetOperation =>
resultSet = OperationUtils.runSetOperation(setOperation, executor,
sessionId)
case resetOperation: ResetOperation =>
@@ -143,6 +144,12 @@ class ExecuteStatement(
}
}
+ private def runModifyOperation(operation: ModifyOperation): Unit = {
+ val result = executor.executeOperation(sessionId, operation)
+ jobId = result.getJobClient.asScala.map(_.getJobID)
+ resultSet = ResultSet.fromJobId(jobId.orNull)
+ }
+
private def runOperation(operation: Operation): Unit = {
val result = executor.executeOperation(sessionId, operation)
jobId = result.getJobClient.asScala.map(_.getJobID)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
index 136733812..09c401988 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
@@ -22,7 +22,8 @@ import java.util
import scala.collection.JavaConverters._
import com.google.common.collect.Iterators
-import org.apache.flink.table.api.{ResultKind, TableResult}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.table.api.{DataTypes, ResultKind, TableResult}
import org.apache.flink.table.catalog.Column
import org.apache.flink.types.Row
@@ -68,6 +69,20 @@ object ResultSet {
.build
}
+ def fromJobId(jobID: JobID): ResultSet = {
+ val data: Array[Row] = if (jobID != null) {
+ Array(Row.of(jobID.toString))
+ } else {
+ // should not happen
+ Array(Row.of("(Empty Job ID)"))
+ }
+ builder
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(Column.physical("result", DataTypes.STRING()))
+ .data(data)
+ .build;
+ }
+
def builder: Builder = new ResultSet.Builder
class Builder {
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala
new file mode 100644
index 000000000..aebcce6c5
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink
+
+import java.util.UUID
+
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_TYPE}
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID,
HA_NAMESPACE}
+import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider}
+
+trait WithDiscoveryFlinkSQLEngine extends WithFlinkSQLEngineOnYarn {
+
+ override protected def engineRefId: String = UUID.randomUUID().toString
+
+ def namespace: String = "/kyuubi/flink-yarn-application-test"
+
+ def shareLevel: String = ShareLevel.USER.toString
+
+ def engineType: String = "flink"
+
+ override def withKyuubiConf: Map[String, String] = {
+ Map(
+ HA_NAMESPACE.key -> namespace,
+ HA_ENGINE_REF_ID.key -> engineRefId,
+ ENGINE_TYPE.key -> "FLINK_SQL",
+ ENGINE_SHARE_LEVEL.key -> shareLevel)
+ }
+
+ def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
+ DiscoveryClientProvider.withDiscoveryClient(conf)(f)
+ }
+
+ def getFlinkEngineServiceUrl: String = {
+ var hostPort: Option[(String, Int)] = None
+ var retries = 0
+ while (hostPort.isEmpty && retries < 5) {
+ withDiscoveryClient(client => hostPort = client.getServerHost(namespace))
+ retries += 1
+ Thread.sleep(1000L)
+ }
+ if (hostPort.isEmpty) {
+ throw new RuntimeException("Time out retrieving Flink engine service
url.")
+ }
+ // delay the access to thrift service because the thrift service
+ // may not be ready although it's registered
+ Thread.sleep(3000L)
+ s"jdbc:hive2://${hostPort.get._1}:${hostPort.get._2}"
+ }
+}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngine.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
similarity index 79%
rename from
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngine.scala
rename to
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
index fbfb8df29..c8435f9c5 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngine.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
@@ -24,32 +24,20 @@ import org.apache.flink.configuration.{Configuration,
RestOptions}
import org.apache.flink.runtime.minicluster.{MiniCluster,
MiniClusterConfiguration}
import org.apache.flink.table.client.gateway.context.DefaultContext
-import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
-trait WithFlinkSQLEngine extends KyuubiFunSuite {
+trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with
WithFlinkTestResources {
protected val flinkConfig = new Configuration()
protected var miniCluster: MiniCluster = _
protected var engine: FlinkSQLEngine = _
// conf will be loaded until start flink engine
def withKyuubiConf: Map[String, String]
- val kyuubiConf: KyuubiConf = FlinkSQLEngine.kyuubiConf
+ protected val kyuubiConf: KyuubiConf = FlinkSQLEngine.kyuubiConf
protected var connectionUrl: String = _
- protected val GENERATED_UDF_CLASS: String = "LowerUDF"
-
- protected val GENERATED_UDF_CODE: String =
- s"""
- public class $GENERATED_UDF_CLASS extends
org.apache.flink.table.functions.ScalarFunction {
- public String eval(String str) {
- return str.toLowerCase();
- }
- }
- """
-
override def beforeAll(): Unit = {
startMiniCluster()
startFlinkEngine()
@@ -67,11 +55,6 @@ trait WithFlinkSQLEngine extends KyuubiFunSuite {
System.setProperty(k, v)
kyuubiConf.set(k, v)
}
- val udfJar = TestUserClassLoaderJar.createJarFile(
- Utils.createTempDir("test-jar").toFile,
- "test-classloader-udf.jar",
- GENERATED_UDF_CLASS,
- GENERATED_UDF_CODE)
val engineContext = new DefaultContext(
List(udfJar.toURI.toURL).asJava,
flinkConfig,
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
new file mode 100644
index 000000000..3847087b3
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink
+
+import java.io.{File, FilenameFilter, FileWriter}
+import java.lang.ProcessBuilder.Redirect
+import java.net.URI
+import java.nio.file.{Files, Paths}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hdfs.MiniDFSCluster
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.server.MiniYARNCluster
+
+import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite,
SCALA_COMPILE_VERSION, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS,
KYUUBI_HOME}
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
+import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
+import org.apache.kyuubi.zookeeper.ZookeeperConf.{ZK_CLIENT_PORT,
ZK_CLIENT_PORT_ADDRESS}
+
+trait WithFlinkSQLEngineOnYarn extends KyuubiFunSuite with
WithFlinkTestResources {
+
+ protected def engineRefId: String
+
+ protected val conf: KyuubiConf = new KyuubiConf(false)
+
+ private var hdfsCluster: MiniDFSCluster = _
+
+ private var yarnCluster: MiniYARNCluster = _
+
+ private var zkServer: EmbeddedZookeeper = _
+
+ def withKyuubiConf: Map[String, String]
+
+ private val yarnConf: YarnConfiguration = {
+ val yarnConfig = new YarnConfiguration()
+
+ // configurations copied from org.apache.flink.yarn.YarnTestBase
+ yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 32)
+ yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
4096)
+
+
yarnConfig.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
true)
+ yarnConfig.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2)
+ yarnConfig.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2)
+
yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4)
+ yarnConfig.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600)
+ yarnConfig.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false)
+ // memory is overwritten in the MiniYARNCluster.
+ // so we have to change the number of cores for testing.
+ yarnConfig.setInt(YarnConfiguration.NM_VCORES, 666)
+
yarnConfig.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
99.0f)
+
yarnConfig.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
1000)
+ yarnConfig.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
5000)
+
+ // capacity-scheduler.xml is missing in hadoop-client-minicluster so this
is a workaround
+ yarnConfig.set("yarn.scheduler.capacity.root.queues",
"default,four_cores_queue")
+
+ yarnConfig.setInt("yarn.scheduler.capacity.root.default.capacity", 100)
+
yarnConfig.setFloat("yarn.scheduler.capacity.root.default.user-limit-factor", 1)
+ yarnConfig.setInt("yarn.scheduler.capacity.root.default.maximum-capacity",
100)
+ yarnConfig.set("yarn.scheduler.capacity.root.default.state", "RUNNING")
+
yarnConfig.set("yarn.scheduler.capacity.root.default.acl_submit_applications",
"*")
+
yarnConfig.set("yarn.scheduler.capacity.root.default.acl_administer_queue", "*")
+
+
yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-capacity",
100)
+
yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-applications",
10)
+
yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-allocation-vcores",
4)
+
yarnConfig.setFloat("yarn.scheduler.capacity.root.four_cores_queue.user-limit-factor",
1)
+
yarnConfig.set("yarn.scheduler.capacity.root.four_cores_queue.acl_submit_applications",
"*")
+
yarnConfig.set("yarn.scheduler.capacity.root.four_cores_queue.acl_administer_queue",
"*")
+
+ yarnConfig.setInt("yarn.scheduler.capacity.node-locality-delay", -1)
+ // Set bind host to localhost to avoid java.net.BindException
+ yarnConfig.set(YarnConfiguration.RM_BIND_HOST, "localhost")
+ yarnConfig.set(YarnConfiguration.NM_BIND_HOST, "localhost")
+
+ yarnConfig
+ }
+
+ override def beforeAll(): Unit = {
+ zkServer = new EmbeddedZookeeper()
+ conf.set(ZK_CLIENT_PORT, 0).set(ZK_CLIENT_PORT_ADDRESS, "localhost")
+ zkServer.initialize(conf)
+ zkServer.start()
+ conf.set(HA_ADDRESSES, zkServer.getConnectString)
+
+ hdfsCluster = new MiniDFSCluster.Builder(new Configuration)
+ .numDataNodes(1)
+ .checkDataNodeAddrConfig(true)
+ .checkDataNodeHostConfig(true)
+ .build()
+
+ val hdfsServiceUrl = s"hdfs://localhost:${hdfsCluster.getNameNodePort}"
+ yarnConf.set("fs.defaultFS", hdfsServiceUrl)
+ yarnConf.addResource(hdfsCluster.getConfiguration(0))
+
+ val cp = System.getProperty("java.class.path")
+ // exclude kyuubi flink engine jar that has SPI for EmbeddedExecutorFactory
+ // which can't be initialized on the client side
+ val hadoopJars = cp.split(":").filter(s => !s.contains("flink") &&
!s.contains("log4j"))
+ val hadoopClasspath = hadoopJars.mkString(":")
+ yarnConf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, hadoopClasspath)
+
+ yarnCluster = new MiniYARNCluster("flink-engine-cluster", 1, 1, 1)
+ yarnCluster.init(yarnConf)
+ yarnCluster.start()
+
+ val hadoopConfDir = Utils.createTempDir().toFile
+ val writer = new FileWriter(new File(hadoopConfDir, "core-site.xml"))
+ yarnCluster.getConfig.writeXml(writer)
+ writer.close()
+
+ val envs = scala.collection.mutable.Map[String, String]()
+ val kyuubiExternals = Utils.getCodeSourceLocation(getClass)
+ .split("externals").head
+ val flinkHome = {
+ val candidates = Paths.get(kyuubiExternals, "externals",
"kyuubi-download", "target")
+ .toFile.listFiles(f => f.getName.contains("flink"))
+ if (candidates == null) None else candidates.map(_.toPath).headOption
+ }
+ if (flinkHome.isDefined) {
+ envs("FLINK_HOME") = flinkHome.get.toString
+ envs("FLINK_CONF_DIR") = Paths.get(flinkHome.get.toString,
"conf").toString
+ }
+ envs("HADOOP_CLASSPATH") = hadoopClasspath
+ envs("HADOOP_CONF_DIR") = hadoopConfDir.getAbsolutePath
+
+ startFlinkEngine(envs.toMap)
+
+ super.beforeAll()
+ }
+
+ private def startFlinkEngine(envs: Map[String, String]): Unit = {
+ val processBuilder: ProcessBuilder = new ProcessBuilder
+ processBuilder.environment().putAll(envs.asJava)
+
+ conf.set(ENGINE_FLINK_APPLICATION_JARS, udfJar.getAbsolutePath)
+ val flinkExtraJars = extraFlinkJars(envs("FLINK_HOME"))
+ val command = new ArrayBuffer[String]()
+
+ command += s"${envs("FLINK_HOME")}${File.separator}bin/flink"
+ command += "run-application"
+ command += "-t"
+ command += "yarn-application"
+ command += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
+ command += s"-Dyarn.tags=KYUUBI,$engineRefId"
+ command += "-Djobmanager.memory.process.size=1g"
+ command += "-Dtaskmanager.memory.process.size=1g"
+ command += "-Dcontainerized.master.env.FLINK_CONF_DIR=."
+ command += "-Dcontainerized.taskmanager.env.FLINK_CONF_DIR=."
+ command +=
s"-Dcontainerized.master.env.HADOOP_CONF_DIR=${envs("HADOOP_CONF_DIR")}"
+ command +=
s"-Dcontainerized.taskmanager.env.HADOOP_CONF_DIR=${envs("HADOOP_CONF_DIR")}"
+ command += "-Dexecution.target=yarn-application"
+ command += "-c"
+ command += "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
+ command += s"${mainResource(envs).get}"
+
+ for ((k, v) <- withKyuubiConf) {
+ conf.set(k, v)
+ }
+
+ for ((k, v) <- conf.getAll) {
+ command += "--conf"
+ command += s"$k=$v"
+ }
+
+ processBuilder.command(command.toList.asJava)
+ processBuilder.redirectOutput(Redirect.INHERIT)
+ processBuilder.redirectError(Redirect.INHERIT)
+
+ info(s"staring flink yarn-application cluster for engine $engineRefId..")
+ val process = processBuilder.start()
+ process.waitFor()
+ info(s"flink yarn-application cluster for engine $engineRefId has started")
+ }
+
+ def extraFlinkJars(flinkHome: String): Array[String] = {
+ // locate flink sql jars
+ val flinkExtraJars = new ListBuffer[String]
+ val flinkSQLJars = Paths.get(flinkHome)
+ .resolve("opt")
+ .toFile
+ .listFiles(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ name.toLowerCase.startsWith("flink-sql-client") ||
+ name.toLowerCase.startsWith("flink-sql-gateway")
+ }
+ }).map(f => f.getAbsolutePath).sorted
+ flinkExtraJars ++= flinkSQLJars
+
+ val userJars = conf.get(ENGINE_FLINK_APPLICATION_JARS)
+ userJars.foreach(jars => flinkExtraJars ++= jars.split(","))
+ flinkExtraJars.toArray
+ }
+
+ /**
+ * Copied form org.apache.kyuubi.engine.ProcBuilder
+ * The engine jar or other runnable jar containing the main method
+ */
+ def mainResource(env: Map[String, String]): Option[String] = {
+ // 1. get the main resource jar for user specified config first
+ val module = "kyuubi-flink-sql-engine"
+ val shortName = "flink"
+ val jarName = s"${module}_$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
+ conf.getOption(s"kyuubi.session.engine.$shortName.main.resource").filter {
userSpecified =>
+ // skip check exist if not local file.
+ val uri = new URI(userSpecified)
+ val schema = if (uri.getScheme != null) uri.getScheme else "file"
+ schema match {
+ case "file" => Files.exists(Paths.get(userSpecified))
+ case _ => true
+ }
+ }.orElse {
+ // 2. get the main resource jar from system build default
+ env.get(KYUUBI_HOME).toSeq
+ .flatMap { p =>
+ Seq(
+ Paths.get(p, "externals", "engines", shortName, jarName),
+ Paths.get(p, "externals", module, "target", jarName))
+ }
+ .find(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
+ }.orElse {
+ // 3. get the main resource from dev environment
+ val cwd = Utils.getCodeSourceLocation(getClass).split("externals")
+ assert(cwd.length > 1)
+ Option(Paths.get(cwd.head, "externals", module, "target", jarName))
+ .map(_.toAbsolutePath.toFile.getCanonicalPath)
+ }
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ if (yarnCluster != null) {
+ yarnCluster.stop()
+ yarnCluster = null
+ }
+ if (hdfsCluster != null) {
+ hdfsCluster.shutdown()
+ hdfsCluster = null
+ }
+ if (zkServer != null) {
+ zkServer.stop()
+ zkServer = null
+ }
+ }
+}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
new file mode 100644
index 000000000..6a85654f0
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
+
+trait WithFlinkTestResources {
+
+ protected val GENERATED_UDF_CLASS: String = "LowerUDF"
+
+ protected val GENERATED_UDF_CODE: String =
+ s"""
+ public class $GENERATED_UDF_CLASS extends
org.apache.flink.table.functions.ScalarFunction {
+ public String eval(String str) {
+ return str.toLowerCase();
+ }
+ }
+ """
+
+ protected val udfJar = TestUserClassLoaderJar.createJarFile(
+ Utils.createTempDir("test-jar").toFile,
+ "test-classloader-udf.jar",
+ GENERATED_UDF_CLASS,
+ GENERATED_UDF_CODE)
+}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
new file mode 100644
index 000000000..e4e6a5c67
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.flink.WithFlinkSQLEngineLocal
+import org.apache.kyuubi.operation.NoneMode
+
+class FlinkOperationLocalSuite extends FlinkOperationSuite
+ with WithFlinkSQLEngineLocal {
+
+ override def withKyuubiConf: Map[String, String] =
+ Map(OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name)
+
+ override protected def jdbcUrl: String =
+ s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
+
+}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
new file mode 100644
index 000000000..b43e83db6
--- /dev/null
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import org.apache.kyuubi.engine.flink.WithDiscoveryFlinkSQLEngine
+
+class FlinkOperationOnYarnSuite extends FlinkOperationSuite
+ with WithDiscoveryFlinkSQLEngine {
+
+ protected def jdbcUrl: String = getFlinkEngineServiceUrl
+}
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 8345d4f9f..77ce3b3ee 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -25,34 +25,17 @@ import scala.collection.JavaConverters._
import org.apache.flink.api.common.JobID
import org.apache.flink.table.types.logical.LogicalTypeRoot
import org.apache.hive.service.rpc.thrift._
-import org.scalatest.concurrent.PatienceConfiguration.Timeout
-import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
+import org.apache.kyuubi.engine.flink.WithFlinkTestResources
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
-import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode}
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
-import org.apache.kyuubi.service.ServiceState._
-class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
- override def withKyuubiConf: Map[String, String] =
- Map(OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name)
-
- override protected def jdbcUrl: String =
- s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
-
- ignore("release session if shared level is CONNECTION") {
- logger.info(s"jdbc url is $jdbcUrl")
- assert(engine.getServiceState == STARTED)
- withJdbcStatement() { _ => }
- eventually(Timeout(20.seconds)) {
- assert(engine.getServiceState == STOPPED)
- }
- }
+abstract class FlinkOperationSuite extends HiveJDBCTestHelper with
WithFlinkTestResources {
test("get catalogs") {
withJdbcStatement() { statement =>
@@ -784,7 +767,8 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with
HiveJDBCTestHelper {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select map ['k1', 'v1', 'k2',
'v2']")
assert(resultSet.next())
- assert(resultSet.getString(1) == "{k1=v1, k2=v2}")
+ assert(List("{k1=v1, k2=v2}", "{k2=v2, k1=v1}")
+ .contains(resultSet.getString(1)))
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.JAVA_OBJECT)
}
@@ -966,16 +950,34 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with
HiveJDBCTestHelper {
}
}
- test("execute statement - insert into") {
+ test("execute statement - batch insert into") {
withMultipleConnectionJdbcStatement() { statement =>
statement.executeQuery("create table tbl_a (a int) with ('connector' =
'blackhole')")
val resultSet = statement.executeQuery("insert into tbl_a select 1")
val metadata = resultSet.getMetaData
- assert(metadata.getColumnName(1) ==
"default_catalog.default_database.tbl_a")
- assert(metadata.getColumnType(1) == java.sql.Types.BIGINT)
+ assert(metadata.getColumnName(1) === "result")
+ assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
assert(resultSet.next())
- assert(resultSet.getLong(1) == -1L)
- }
+ assert(resultSet.getString(1).length == 32)
+ };
+ }
+
+ test("execute statement - streaming insert into") {
+ withMultipleConnectionJdbcStatement()({ statement =>
+ // Flink currently doesn't support stop job statement, thus use a finite
stream
+ statement.executeQuery(
+ "create table tbl_a (a int) with (" +
+ "'connector' = 'datagen', " +
+ "'rows-per-second'='10', " +
+ "'number-of-rows'='100')")
+ statement.executeQuery("create table tbl_b (a int) with ('connector' =
'blackhole')")
+ val resultSet = statement.executeQuery("insert into tbl_b select * from
tbl_a")
+ val metadata = resultSet.getMetaData
+ assert(metadata.getColumnName(1) === "result")
+ assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
+ assert(resultSet.next())
+ assert(resultSet.getString(1).length == 32)
+ })
}
test("execute statement - set properties") {
diff --git
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
index 1194f3582..1657f21f6 100644
---
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
+++
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
@@ -20,10 +20,10 @@ package org.apache.kyuubi.engine.flink.operation
import java.sql.Statement
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
+import org.apache.kyuubi.engine.flink.WithFlinkSQLEngineLocal
import org.apache.kyuubi.operation.{AnalyzeMode, ExecutionMode,
HiveJDBCTestHelper, ParseMode, PhysicalMode}
-class PlanOnlyOperationSuite extends WithFlinkSQLEngine with
HiveJDBCTestHelper {
+class PlanOnlyOperationSuite extends WithFlinkSQLEngineLocal with
HiveJDBCTestHelper {
override def withKyuubiConf: Map[String, String] =
Map(
diff --git a/integration-tests/kyuubi-flink-it/pom.xml
b/integration-tests/kyuubi-flink-it/pom.xml
index c6a55c62c..eada7841c 100644
--- a/integration-tests/kyuubi-flink-it/pom.xml
+++ b/integration-tests/kyuubi-flink-it/pom.xml
@@ -79,6 +79,37 @@
<scope>test</scope>
</dependency>
+ <!-- YARN -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-minicluster</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcprov-jdk15on</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk15on</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>jakarta.activation</groupId>
+ <artifactId>jakarta.activation-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>jakarta.xml.bind</groupId>
+ <artifactId>jakarta.xml.bind-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/WithKyuubiServerAndYarnMiniCluster.scala
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/WithKyuubiServerAndYarnMiniCluster.scala
new file mode 100644
index 000000000..de9a8ae2d
--- /dev/null
+++
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/WithKyuubiServerAndYarnMiniCluster.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.it.flink
+
+import java.io.{File, FileWriter}
+import java.nio.file.Paths
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils, WithKyuubiServer}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
+import org.apache.kyuubi.server.{MiniDFSService, MiniYarnService}
+
+trait WithKyuubiServerAndYarnMiniCluster extends KyuubiFunSuite with
WithKyuubiServer {
+
+ val kyuubiHome: String =
Utils.getCodeSourceLocation(getClass).split("integration-tests").head
+
+ override protected val conf: KyuubiConf = new KyuubiConf(false)
+
+ protected var miniHdfsService: MiniDFSService = _
+
+ protected var miniYarnService: MiniYarnService = _
+
+ private val yarnConf: YarnConfiguration = {
+ val yarnConfig = new YarnConfiguration()
+
+ // configurations copied from org.apache.flink.yarn.YarnTestBase
+ yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 32)
+ yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
4096)
+
+
yarnConfig.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
true)
+ yarnConfig.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2)
+ yarnConfig.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2)
+
yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4)
+ yarnConfig.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600)
+ yarnConfig.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false)
+ // memory is overwritten in the MiniYARNCluster.
+ // so we have to change the number of cores for testing.
+ yarnConfig.setInt(YarnConfiguration.NM_VCORES, 666)
+
yarnConfig.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
99.0f)
+
yarnConfig.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
1000)
+ yarnConfig.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
5000)
+
+ // capacity-scheduler.xml is missing in hadoop-client-minicluster so this
is a workaround
+ yarnConfig.set("yarn.scheduler.capacity.root.queues",
"default,four_cores_queue")
+
+ yarnConfig.setInt("yarn.scheduler.capacity.root.default.capacity", 100)
+
yarnConfig.setFloat("yarn.scheduler.capacity.root.default.user-limit-factor", 1)
+ yarnConfig.setInt("yarn.scheduler.capacity.root.default.maximum-capacity",
100)
+ yarnConfig.set("yarn.scheduler.capacity.root.default.state", "RUNNING")
+
yarnConfig.set("yarn.scheduler.capacity.root.default.acl_submit_applications",
"*")
+
yarnConfig.set("yarn.scheduler.capacity.root.default.acl_administer_queue", "*")
+
+
yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-capacity",
100)
+
yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-applications",
10)
+
yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-allocation-vcores",
4)
+
yarnConfig.setFloat("yarn.scheduler.capacity.root.four_cores_queue.user-limit-factor",
1)
+
yarnConfig.set("yarn.scheduler.capacity.root.four_cores_queue.acl_submit_applications",
"*")
+
yarnConfig.set("yarn.scheduler.capacity.root.four_cores_queue.acl_administer_queue",
"*")
+
+ yarnConfig.setInt("yarn.scheduler.capacity.node-locality-delay", -1)
+ // Set bind host to localhost to avoid java.net.BindException
+ yarnConfig.set(YarnConfiguration.RM_BIND_HOST, "localhost")
+ yarnConfig.set(YarnConfiguration.NM_BIND_HOST, "localhost")
+
+ yarnConfig
+ }
+
+ override def beforeAll(): Unit = {
+ miniHdfsService = new MiniDFSService()
+ miniHdfsService.initialize(conf)
+ miniHdfsService.start()
+
+ val hdfsServiceUrl = s"hdfs://localhost:${miniHdfsService.getDFSPort}"
+ yarnConf.set("fs.defaultFS", hdfsServiceUrl)
+ yarnConf.addResource(miniHdfsService.getHadoopConf)
+
+ val cp = System.getProperty("java.class.path")
+ // exclude kyuubi flink engine jar that has SPI for EmbeddedExecutorFactory
+ // which can't be initialized on the client side
+ val hadoopJars = cp.split(":").filter(s => !s.contains("flink"))
+ val hadoopClasspath = hadoopJars.mkString(":")
+ yarnConf.set("yarn.application.classpath", hadoopClasspath)
+
+ miniYarnService = new MiniYarnService()
+ miniYarnService.setYarnConf(yarnConf)
+ miniYarnService.initialize(conf)
+ miniYarnService.start()
+
+ val hadoopConfDir = Utils.createTempDir().toFile
+ val writer = new FileWriter(new File(hadoopConfDir, "core-site.xml"))
+ yarnConf.writeXml(writer)
+ writer.close()
+
+ val flinkHome = {
+ val candidates = Paths.get(kyuubiHome, "externals", "kyuubi-download",
"target")
+ .toFile.listFiles(f => f.getName.contains("flink"))
+ if (candidates == null) None else candidates.map(_.toPath).headOption
+ }
+ if (flinkHome.isEmpty) {
+ throw new IllegalStateException(s"Flink home not found in
$kyuubiHome/externals")
+ }
+
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.KYUUBI_HOME", kyuubiHome)
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.FLINK_HOME", flinkHome.get.toString)
+ conf.set(
+ s"$KYUUBI_ENGINE_ENV_PREFIX.FLINK_CONF_DIR",
+ s"${flinkHome.get.toString}${File.separator}conf")
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CLASSPATH", hadoopClasspath)
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR",
hadoopConfDir.getAbsolutePath)
+ conf.set(s"flink.containerized.master.env.HADOOP_CLASSPATH",
hadoopClasspath)
+ conf.set(s"flink.containerized.master.env.HADOOP_CONF_DIR",
hadoopConfDir.getAbsolutePath)
+ conf.set(s"flink.containerized.taskmanager.env.HADOOP_CONF_DIR",
hadoopConfDir.getAbsolutePath)
+
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ if (miniYarnService != null) {
+ miniYarnService.stop()
+ miniYarnService = null
+ }
+ if (miniHdfsService != null) {
+ miniHdfsService.stop()
+ miniHdfsService = null
+ }
+ }
+}
diff --git
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
new file mode 100644
index 000000000..afa4dce8f
--- /dev/null
+++
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.it.flink.operation
+
+import org.apache.hive.service.rpc.thrift.{TGetInfoReq, TGetInfoType}
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.it.flink.WithKyuubiServerAndYarnMiniCluster
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
+
+class FlinkOperationSuiteOnYarn extends WithKyuubiServerAndYarnMiniCluster
+ with HiveJDBCTestHelper {
+
+ override protected def jdbcUrl: String = {
+ // delay the access to thrift service because the thrift service
+ // may not be ready although it's registered
+ Thread.sleep(3000L)
+ getJdbcUrl
+ }
+
+ override def beforeAll(): Unit = {
+ conf
+ .set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
+ .set(ENGINE_TYPE, "FLINK_SQL")
+ .set("flink.execution.target", "yarn-application")
+ .set("flink.parallelism.default", "6")
+ super.beforeAll()
+ }
+
+ test("get catalogs for flink sql") {
+ withJdbcStatement() { statement =>
+ val meta = statement.getConnection.getMetaData
+ val catalogs = meta.getCatalogs
+ val expected = Set("default_catalog").toIterator
+ while (catalogs.next()) {
+ assert(catalogs.getString(TABLE_CAT) === expected.next())
+ }
+ assert(!expected.hasNext)
+ assert(!catalogs.next())
+ }
+ }
+
+ test("execute statement - create/alter/drop table") {
+ withJdbcStatement() { statement =>
+ statement.executeQuery("create table tbl_a (a string) with ('connector'
= 'blackhole')")
+ assert(statement.execute("alter table tbl_a rename to tbl_b"))
+ assert(statement.execute("drop table tbl_b"))
+ }
+ }
+
+ test("execute statement - select column name with dots") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("select 'tmp.hello'")
+ assert(resultSet.next())
+ assert(resultSet.getString(1) === "tmp.hello")
+ }
+ }
+
+ test("set kyuubi conf into flink conf") {
+ withJdbcStatement() { statement =>
+ val resultSet = statement.executeQuery("SET")
+ // Flink does not support set key without value currently,
+ // thus read all rows to find the desired one
+ var success = false
+ while (resultSet.next() && !success) {
+ if (resultSet.getString(1) == "parallelism.default" &&
+ resultSet.getString(2) == "6") {
+ success = true
+ }
+ }
+ assert(success)
+ }
+ }
+
+ test("server info provider - server") {
+ withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
+ withSessionHandle { (client, handle) =>
+ val req = new TGetInfoReq()
+ req.setSessionHandle(handle)
+ req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
+ assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache
Kyuubi")
+ }
+ }
+ }
+
+ test("server info provider - engine") {
+ withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
+ withSessionHandle { (client, handle) =>
+ val req = new TGetInfoReq()
+ req.setSessionHandle(handle)
+ req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
+ assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache
Flink")
+ }
+ }
+ }
+}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index b5229e2ad..2634bb4ab 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2367,14 +2367,14 @@ object KyuubiConf {
val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.flink.memory")
- .doc("The heap memory for the Flink SQL engine")
+ .doc("The heap memory for the Flink SQL engine. Only effective in yarn
session mode.")
.version("1.6.0")
.stringConf
.createWithDefault("1g")
val ENGINE_FLINK_JAVA_OPTIONS: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.flink.java.options")
- .doc("The extra Java options for the Flink SQL engine")
+ .doc("The extra Java options for the Flink SQL engine. Only effective in
yarn session mode.")
.version("1.6.0")
.stringConf
.createOptional
@@ -2382,11 +2382,19 @@ object KyuubiConf {
val ENGINE_FLINK_EXTRA_CLASSPATH: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.flink.extra.classpath")
.doc("The extra classpath for the Flink SQL engine, for configuring the
location" +
- " of hadoop client jars, etc")
+ " of hadoop client jars, etc. Only effective in yarn session mode.")
.version("1.6.0")
.stringConf
.createOptional
+ val ENGINE_FLINK_APPLICATION_JARS: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.flink.application.jars")
+ .doc("A comma-separated list of the local jars to be shipped with the
job to the cluster. " +
+ "For example, SQL UDF jars. Only effective in yarn application mode.")
+ .version("1.8.0")
+ .stringConf
+ .createOptional
+
val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
buildConf("kyuubi.server.limit.connections.per.user")
.doc("Maximum kyuubi server connections per user." +
diff --git
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index bdb9b12fe..a1b1466d1 100644
---
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -60,6 +60,7 @@ abstract class ServiceDiscovery(
override def start(): Unit = {
discoveryClient.registerService(conf, namespace, this)
+ info(s"Registered $name in namespace ${_namespace}.")
super.start()
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 63b37f1c5..b2b3ce909 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -187,7 +187,7 @@ private[kyuubi] class EngineRef(
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
new SparkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case FLINK_SQL =>
- conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
+ conf.setIfMissing(FlinkProcessBuilder.YARN_APP_KEY, defaultEngineName)
new FlinkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case TRINO =>
new TrinoProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 9b23e550d..02aed2866 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -105,10 +105,10 @@ object KyuubiApplicationManager {
conf.set("spark.kubernetes.driver.label." + LABEL_KYUUBI_UNIQUE_KEY, tag)
}
- private def setupFlinkK8sTag(tag: String, conf: KyuubiConf): Unit = {
- val originalTag = conf.getOption(FlinkProcessBuilder.TAG_KEY).map(_ +
",").getOrElse("")
+ private def setupFlinkYarnTag(tag: String, conf: KyuubiConf): Unit = {
+ val originalTag = conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY).map(_ +
",").getOrElse("")
val newTag = s"${originalTag}KYUUBI" +
Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("")
- conf.set(FlinkProcessBuilder.TAG_KEY, newTag)
+ conf.set(FlinkProcessBuilder.YARN_TAG_KEY, newTag)
}
val uploadWorkDir: Path = {
@@ -178,7 +178,7 @@ object KyuubiApplicationManager {
setupSparkK8sTag(applicationTag, conf)
case ("FLINK", _) =>
// running flink on other platforms is not yet supported
- setupFlinkK8sTag(applicationTag, conf)
+ setupFlinkYarnTag(applicationTag, conf)
// other engine types are running locally yet
case _ =>
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index b8146c4d2..8642d87d7 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -21,7 +21,7 @@ import java.io.{File, FilenameFilter}
import java.nio.file.{Files, Paths}
import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import com.google.common.annotations.VisibleForTesting
@@ -50,88 +50,150 @@ class FlinkProcessBuilder(
val flinkHome: String = getEngineHome(shortName)
+ val flinkExecutable: String = {
+ Paths.get(flinkHome, "bin", FLINK_EXEC_FILE).toFile.getCanonicalPath
+ }
+
override protected def module: String = "kyuubi-flink-sql-engine"
override protected def mainClass: String =
"org.apache.kyuubi.engine.flink.FlinkSQLEngine"
override def env: Map[String, String] = conf.getEnvs +
- (FLINK_PROXY_USER_KEY -> proxyUser)
+ ("FLINK_CONF_DIR" -> conf.getEnvs.getOrElse(
+ "FLINK_CONF_DIR",
+ s"$flinkHome${File.separator}conf"))
+
+ override def clusterManager(): Option[String] = Some("yarn")
override protected val commands: Array[String] = {
KyuubiApplicationManager.tagApplication(engineRefId, shortName,
clusterManager(), conf)
- val buffer = new ArrayBuffer[String]()
- buffer += executable
-
- val memory = conf.get(ENGINE_FLINK_MEMORY)
- buffer += s"-Xmx$memory"
- val javaOptions = conf.get(ENGINE_FLINK_JAVA_OPTIONS)
- if (javaOptions.isDefined) {
- buffer += javaOptions.get
- }
- buffer += "-cp"
- val classpathEntries = new java.util.LinkedHashSet[String]
- // flink engine runtime jar
- mainResource.foreach(classpathEntries.add)
- // flink sql client jar
- val flinkSqlClientPath = Paths.get(flinkHome)
- .resolve("opt")
- .toFile
- .listFiles(new FilenameFilter {
- override def accept(dir: File, name: String): Boolean = {
- name.toLowerCase.startsWith("flink-sql-client")
+ // flink.execution.target are required in Kyuubi conf currently
+ val executionTarget = conf.getOption("flink.execution.target")
+ executionTarget match {
+ case Some("yarn-application") =>
+ val buffer = new ArrayBuffer[String]()
+ buffer += flinkExecutable
+ buffer += "run-application"
+
+ val flinkExtraJars = new ListBuffer[String]
+ // locate flink sql jars
+ val flinkSqlJars = Paths.get(flinkHome)
+ .resolve("opt")
+ .toFile
+ .listFiles(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ name.toLowerCase.startsWith("flink-sql-client") ||
+ name.toLowerCase.startsWith("flink-sql-gateway")
+ }
+ }).map(f => f.getAbsolutePath).sorted
+ flinkExtraJars ++= flinkSqlJars
+
+ val userJars = conf.get(ENGINE_FLINK_APPLICATION_JARS)
+ userJars.foreach(jars => flinkExtraJars ++= jars.split(","))
+
+ buffer += "-t"
+ buffer += "yarn-application"
+ buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
+ buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}"
+ buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=."
+
+ val customFlinkConf = conf.getAllWithPrefix("flink", "")
+ customFlinkConf.foreach { case (k, v) =>
+ buffer += s"-D$k=$v"
}
- }).head.getAbsolutePath
- classpathEntries.add(flinkSqlClientPath)
-
- // jars from flink lib
- classpathEntries.add(s"$flinkHome${File.separator}lib${File.separator}*")
-
- // classpath contains flink configurations, default to flink.home/conf
- classpathEntries.add(env.getOrElse("FLINK_CONF_DIR",
s"$flinkHome${File.separator}conf"))
- // classpath contains hadoop configurations
- env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
- env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
- env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
- val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH_KEY)
- hadoopCp.foreach(classpathEntries.add)
- val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
- extraCp.foreach(classpathEntries.add)
- if (hadoopCp.isEmpty && extraCp.isEmpty) {
- warn(s"The conf of ${FLINK_HADOOP_CLASSPATH_KEY} and
${ENGINE_FLINK_EXTRA_CLASSPATH.key}" +
- s" is empty.")
- debug("Detected development environment")
- mainResource.foreach { path =>
- val devHadoopJars = Paths.get(path).getParent
- .resolve(s"scala-$SCALA_COMPILE_VERSION")
- .resolve("jars")
- if (!Files.exists(devHadoopJars)) {
- throw new KyuubiException(s"The path $devHadoopJars does not exists.
" +
- s"Please set ${FLINK_HADOOP_CLASSPATH_KEY} or
${ENGINE_FLINK_EXTRA_CLASSPATH.key} " +
- s"for configuring location of hadoop client jars, etc")
+
+ buffer += "-c"
+ buffer += s"$mainClass"
+ buffer += s"${mainResource.get}"
+
+ buffer += "--conf"
+ buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
+ conf.getAll.foreach { case (k, v) =>
+ if (k.startsWith("kyuubi.")) {
+ buffer += "--conf"
+ buffer += s"$k=$v"
+ }
}
- classpathEntries.add(s"$devHadoopJars${File.separator}*")
- }
- }
- buffer += classpathEntries.asScala.mkString(File.pathSeparator)
- buffer += mainClass
- buffer += "--conf"
- buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
+ buffer.toArray
+
+ case _ =>
+ val buffer = new ArrayBuffer[String]()
+ buffer += executable
- for ((k, v) <- conf.getAll) {
- buffer += "--conf"
- buffer += s"$k=$v"
+ val memory = conf.get(ENGINE_FLINK_MEMORY)
+ buffer += s"-Xmx$memory"
+ val javaOptions = conf.get(ENGINE_FLINK_JAVA_OPTIONS)
+ if (javaOptions.isDefined) {
+ buffer += javaOptions.get
+ }
+
+ buffer += "-cp"
+ val classpathEntries = new java.util.LinkedHashSet[String]
+ // flink engine runtime jar
+ mainResource.foreach(classpathEntries.add)
+ // flink sql client jar
+ val flinkSqlClientPath = Paths.get(flinkHome)
+ .resolve("opt")
+ .toFile
+ .listFiles(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ name.toLowerCase.startsWith("flink-sql-client")
+ }
+ }).head.getAbsolutePath
+ classpathEntries.add(flinkSqlClientPath)
+
+ // jars from flink lib
+
classpathEntries.add(s"$flinkHome${File.separator}lib${File.separator}*")
+
+ // classpath contains flink configurations, default to flink.home/conf
+ classpathEntries.add(env.getOrElse("FLINK_CONF_DIR",
s"$flinkHome${File.separator}conf"))
+ // classpath contains hadoop configurations
+ env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
+ env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
+ env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
+ val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH_KEY)
+ hadoopCp.foreach(classpathEntries.add)
+ val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
+ extraCp.foreach(classpathEntries.add)
+ if (hadoopCp.isEmpty && extraCp.isEmpty) {
+ warn(s"The conf of ${FLINK_HADOOP_CLASSPATH_KEY} and " +
+ s"${ENGINE_FLINK_EXTRA_CLASSPATH.key} is empty.")
+ debug("Detected development environment.")
+ mainResource.foreach { path =>
+ val devHadoopJars = Paths.get(path).getParent
+ .resolve(s"scala-$SCALA_COMPILE_VERSION")
+ .resolve("jars")
+ if (!Files.exists(devHadoopJars)) {
+ throw new KyuubiException(s"The path $devHadoopJars does not
exists. " +
+ s"Please set ${FLINK_HADOOP_CLASSPATH_KEY} or
${ENGINE_FLINK_EXTRA_CLASSPATH.key}" +
+ s" for configuring location of hadoop client jars, etc.")
+ }
+ classpathEntries.add(s"$devHadoopJars${File.separator}*")
+ }
+ }
+ buffer += classpathEntries.asScala.mkString(File.pathSeparator)
+ buffer += mainClass
+
+ buffer += "--conf"
+ buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
+
+ conf.getAll.foreach { case (k, v) =>
+ buffer += "--conf"
+ buffer += s"$k=$v"
+ }
+ buffer.toArray
}
- buffer.toArray
}
override def shortName: String = "flink"
}
object FlinkProcessBuilder {
- final val APP_KEY = "yarn.application.name"
- final val TAG_KEY = "yarn.tags"
+ final val FLINK_EXEC_FILE = "flink"
+ final val YARN_APP_KEY = "yarn.application.name"
+ final val YARN_TAG_KEY = "yarn.tags"
final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH"
final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER"
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 7ee38d4ef..53450b589 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.engine.flink
import java.io.File
+import java.nio.file.{Files, Paths}
import scala.collection.JavaConverters._
import scala.collection.immutable.ListMap
@@ -25,18 +26,36 @@ import scala.util.matching.Regex
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_EXTRA_CLASSPATH,
ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY}
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS,
ENGINE_FLINK_EXTRA_CLASSPATH, ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY}
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
class FlinkProcessBuilderSuite extends KyuubiFunSuite {
- private def conf = KyuubiConf().set("kyuubi.on", "off")
+ private def sessionModeConf = KyuubiConf()
+ .set("flink.execution.target", "yarn-session")
+ .set("kyuubi.on", "off")
.set(ENGINE_FLINK_MEMORY, "512m")
.set(
ENGINE_FLINK_JAVA_OPTIONS,
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")
+ private def applicationModeConf = KyuubiConf()
+ .set("flink.execution.target", "yarn-application")
+ .set(ENGINE_FLINK_APPLICATION_JARS, tempUdfJar.toString)
+ .set("kyuubi.on", "off")
+
+ private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile
+ private val tempOpt =
+ Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString,
"opt")).toFile
+ Files.createFile(Paths.get(tempOpt.toPath.toString,
"flink-sql-client-1.16.1.jar"))
+ Files.createFile(Paths.get(tempOpt.toPath.toString,
"flink-sql-gateway-1.16.1.jar"))
+ private val tempUsrLib =
+ Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString,
"usrlib")).toFile
+ private val tempUdfJar =
+ Files.createFile(Paths.get(tempUsrLib.toPath.toString, "test-udf.jar"))
+
private def envDefault: ListMap[String, String] = ListMap(
- "JAVA_HOME" -> s"${File.separator}jdk")
+ "JAVA_HOME" -> s"${File.separator}jdk",
+ "FLINK_HOME" -> s"${tempFlinkHome.toPath}")
private def envWithoutHadoopCLASSPATH: ListMap[String, String] = envDefault +
("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf") +
("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") +
@@ -44,11 +63,12 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private def envWithAllHadoop: ListMap[String, String] =
envWithoutHadoopCLASSPATH +
(FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
private def confStr: String = {
- conf.clone.set("yarn.tags", "KYUUBI").getAll
+ sessionModeConf.clone.set("yarn.tags", "KYUUBI").getAll
.map { case (k, v) => s"\\\\\\n\\t--conf $k=$v" }
.mkString(" ")
}
- private def matchActualAndExpected(builder: FlinkProcessBuilder): Unit = {
+
+ private def matchActualAndExpectedSessionMode(builder: FlinkProcessBuilder):
Unit = {
val actualCommands = builder.toString
val classpathStr = constructClasspathStr(builder)
val expectedCommands =
@@ -59,6 +79,27 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
assert(matcher.matches())
}
+ private def matchActualAndExpectedApplicationMode(builder:
FlinkProcessBuilder): Unit = {
+ val actualCommands = builder.toString
+ val expectedCommands =
+ escapePaths(s"${builder.flinkExecutable} run-application ") +
+ s"-t yarn-application " +
+
s"-Dyarn.ship-files=.*\\/flink-sql-client.*jar;.*\\/flink-sql-gateway.*jar;$tempUdfJar
" +
+ s"-Dyarn\\.tags=KYUUBI " +
+ s"-Dcontainerized\\.master\\.env\\.FLINK_CONF_DIR=\\. " +
+ s"-Dexecution.target=yarn-application " +
+ s"-c org\\.apache\\.kyuubi\\.engine\\.flink\\.FlinkSQLEngine " +
+ s".*kyuubi-flink-sql-engine_.*jar" +
+ s"(?: \\\\\\n\\t--conf \\S+=\\S+)+"
+ val regex = new Regex(expectedCommands)
+ val matcher = regex.pattern.matcher(actualCommands)
+ assert(matcher.matches())
+ }
+
+ private def escapePaths(path: String): String = {
+ path.replaceAll("/", "\\/")
+ }
+
private def constructClasspathStr(builder: FlinkProcessBuilder) = {
val classpathEntries = new java.util.LinkedHashSet[String]
builder.mainResource.foreach(classpathEntries.add)
@@ -69,11 +110,11 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
classpathEntries.add(s"$flinkHome$flinkConfPathSuffix")
val envMap = builder.env
envMap.foreach { case (k, v) =>
- if (!k.equals("JAVA_HOME")) {
+ if (!k.equals("JAVA_HOME") && !k.equals("FLINK_HOME")) {
classpathEntries.add(v)
}
}
- val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
+ val extraCp = sessionModeConf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
extraCp.foreach(classpathEntries.add)
val classpathStr = classpathEntries.asScala.mkString(File.pathSeparator)
classpathStr
@@ -86,18 +127,25 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private val flinkConfPathSuffix = s"${File.separator}conf"
private val mainClassStr = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
- test("all hadoop related environment variables are configured") {
- val builder = new FlinkProcessBuilder("vinoyang", conf) {
+ test("session mode - all hadoop related environment variables are
configured") {
+ val builder = new FlinkProcessBuilder("vinoyang", sessionModeConf) {
override def env: Map[String, String] = envWithAllHadoop
}
- matchActualAndExpected(builder)
+ matchActualAndExpectedSessionMode(builder)
}
- test("only FLINK_HADOOP_CLASSPATH environment variables are configured") {
- val builder = new FlinkProcessBuilder("vinoyang", conf) {
+ test("session mode - only FLINK_HADOOP_CLASSPATH environment variables are
configured") {
+ val builder = new FlinkProcessBuilder("vinoyang", sessionModeConf) {
override def env: Map[String, String] = envDefault +
(FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
}
- matchActualAndExpected(builder)
+ matchActualAndExpectedSessionMode(builder)
+ }
+
+ test("application mode - default env") {
+ val builder = new FlinkProcessBuilder("paullam", applicationModeConf) {
+ override def env: Map[String, String] = envDefault
+ }
+ matchActualAndExpectedApplicationMode(builder)
}
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
index 1a73cc24c..68a175efc 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
@@ -34,7 +34,7 @@ import org.apache.kyuubi.service.AbstractService
class MiniYarnService extends AbstractService("TestMiniYarnService") {
private val hadoopConfDir: File = Utils.createTempDir().toFile
- private val yarnConf: YarnConfiguration = {
+ private var yarnConf: YarnConfiguration = {
val yarnConfig = new YarnConfiguration()
// Disable the disk utilization check to avoid the test hanging when
people's disks are
// getting full.
@@ -71,6 +71,10 @@ class MiniYarnService extends
AbstractService("TestMiniYarnService") {
}
private val yarnCluster: MiniYARNCluster = new MiniYARNCluster(getName, 1,
1, 1)
+ def setYarnConf(yarnConf: YarnConfiguration): Unit = {
+ this.yarnConf = yarnConf
+ }
+
override def initialize(conf: KyuubiConf): Unit = {
yarnCluster.init(yarnConf)
super.initialize(conf)