This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new b315123a6 [KYUUBI #1652] Support Flink yarn application mode
b315123a6 is described below

commit b315123a6b6dfa7b03a5ab7875856bdfd4e0eaed
Author: Paul Lin <[email protected]>
AuthorDate: Fri Apr 7 18:51:48 2023 +0800

    [KYUUBI #1652] Support Flink yarn application mode
    
    ### _Why are the changes needed?_
    Flink yarn application mode is crucial for the production usage of Flink 
engine.
    
    To test this PR locally, we should:
    
    1) set `flink.execution.target=yarn-application` in `kyuubi-defaults.conf`.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #4604 from link3280/KYUUBI-1652.
    
    Closes #1652
    
    49b454f1e [Paul Lin] [KYUUBI #1652] Delay access to thrift services to 
stablize tests
    b91b64bf6 [Paul Lin] Revert "[KYUUBI #1652] Avoid hadoop conf injecting 
into kyuubi conf"
    c9f710b0f [Paul Lin] [KYUUBI #1652] Avoid hadoop conf injecting into kyuubi 
conf
    cde8a5477 [Paul Lin] [KYUUBI #1652] Improve docs
    edba0ec79 [Paul Lin] [KYUUBI #1652] Improve codestyle
    e03e055ae [Paul Lin] [KYUUBI #1652] Update docs according to the comments
    490559cd8 [Paul Lin] [KYUUBI #1652] Update docs
    769d1a8fa [Paul Lin] [KYUUBI #1652] Move zookeeper to test scope
    bafb3f5a4 [Paul Lin] [KYUUBI #1652] Fix flink-it test
    dd40c72b8 [Paul Lin] [KYUUBI #1652] Update docs
    36c993fc2 [Paul Lin] [KYUUBI #1652] Fix javax.activation not found in 
flink-it
    2a751bdd6 [Paul Lin] [KYUUBI #1652] Introduce EmbeddedZookeeper in Flink 
yarn tests
    0933b7082 [Paul Lin] [KYUUBI #1652] Fix spotless issue
    b858f7df6 [Paul Lin] [KYUUBI #1652] Fix Flink submit timeout because 
failing to find hadoop conf
    15801b598 [Paul Lin] [KYUUBI #1652] Replace unused jaxb
    b210615e4 [Paul Lin] Update externals/kyuubi-flink-sql-engine/pom.xml
    24b23da2c [Paul Lin] [KYUUBI #1652] Update jaxb scope to test
    240efae1a [Paul Lin] [KYUUBI #1652] Update jaxb scope to runtime
    0e9a508b6 [Paul Lin] [KYUUBI #1652] Update jaxb scope to runtime
    b5dbd3346 [Paul Lin] [KYUUBI #1652] Fix jdk11 jaxb ClassNotFoundException
    72ba3ee6d [Paul Lin] [KYUUBI #1652] Update tm memory to 1gb
    4e10ea21f [Paul Lin] [KYUUBI #1652] Refactor flink engin tests
    e9cec4a65 [Paul Lin] [KYUUBI #1652] Add flink-it tests
    6eb9fd3ad [Paul Lin] [KYUUBI #1652] Fix ProcessBuilder tests
    6aca061e6 [Paul Lin] [KYUUBI #1652] Fix ClassNotFoundException
    7581a2a0d [Paul Lin] [KYUUBI #1652] Fix missing minicluster
    412c34571 [Paul Lin] [KYUUBI #1652] Remove flink-yarn dependencies
    0eafbd7b0 [Paul Lin] Update 
externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
    ee2c64d04 [Paul Lin] [KYUUBI #1652] Add Flink YARN application tests
    a72627393 [Paul Lin] [KYUUBI #1652] Avoid flink-yarn dependencies
    a75cb2579 [Paul Lin] [KYUUBI #1652] Fix test issue
    b7e173f30 [Paul Lin] [KYUUBI #1652] Replace file-based Kyuubi conf with cli 
args
    693ad6529 [Paul Lin] [KYUUBI #1652] Removed unused imports
    68e0081e1 [Paul Lin] Update 
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
    ba021de9d [Paul Lin] [KYUUBI #1652] Search flink-sql.* jars and add them to 
pipeline jars
    0846babbd [Paul Lin] [KYUUBI #1652] Avoid Scala bug
    56413fe83 [Paul Lin] [KYUUBI #1652] Improve tmp files cleanup
    8bdb672c4 [Paul Lin] [KYUUBI #1652] Explicitly load Kyuubi conf on Flink 
engine start
    0b6325000 [Paul Lin] [KYUUBI #1652] Fix test failures
    0ba03e439 [Paul Lin] [KYUUBI #1652] Fix wrong Flink args
    00f036b04 [Paul Lin] [KYUUBI #1652] Remove unused util methods
    dfd2777ac [Paul Lin] [KYUUBI ##1652] Support Flink yarn application mode
    
    Authored-by: Paul Lin <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 docs/deployment/settings.md                        |   7 +-
 externals/kyuubi-flink-sql-engine/pom.xml          |  38 +++
 .../executors/EmbeddedExecutorFactory.java         | 125 ++++++++++
 ...he.flink.core.execution.PipelineExecutorFactory |  16 ++
 .../kyuubi/engine/flink/FlinkSQLEngine.scala       |  24 +-
 .../engine/flink/operation/ExecuteStatement.scala  |   9 +-
 .../kyuubi/engine/flink/result/ResultSet.scala     |  17 +-
 .../engine/flink/WithDiscoveryFlinkSQLEngine.scala |  65 +++++
 ...LEngine.scala => WithFlinkSQLEngineLocal.scala} |  23 +-
 .../engine/flink/WithFlinkSQLEngineOnYarn.scala    | 265 +++++++++++++++++++++
 .../engine/flink/WithFlinkTestResources.scala      |  41 ++++
 .../flink/operation/FlinkOperationLocalSuite.scala |  33 +++
 .../operation/FlinkOperationOnYarnSuite.scala      |  26 ++
 .../flink/operation/FlinkOperationSuite.scala      |  54 +++--
 .../flink/operation/PlanOnlyOperationSuite.scala   |   4 +-
 integration-tests/kyuubi-flink-it/pom.xml          |  31 +++
 .../flink/WithKyuubiServerAndYarnMiniCluster.scala | 145 +++++++++++
 .../operation/FlinkOperationSuiteOnYarn.scala      | 113 +++++++++
 .../org/apache/kyuubi/config/KyuubiConf.scala      |  14 +-
 .../apache/kyuubi/ha/client/ServiceDiscovery.scala |   1 +
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |   2 +-
 .../kyuubi/engine/KyuubiApplicationManager.scala   |   8 +-
 .../kyuubi/engine/flink/FlinkProcessBuilder.scala  | 188 ++++++++++-----
 .../engine/flink/FlinkProcessBuilderSuite.scala    |  74 +++++-
 .../org/apache/kyuubi/server/MiniYarnService.scala |   6 +-
 25 files changed, 1190 insertions(+), 139 deletions(-)

diff --git a/docs/deployment/settings.md b/docs/deployment/settings.md
index 960f2c328..b12185c3c 100644
--- a/docs/deployment/settings.md
+++ b/docs/deployment/settings.md
@@ -136,9 +136,10 @@ You can configure the Kyuubi properties in 
`$KYUUBI_HOME/conf/kyuubi-defaults.co
 | kyuubi.engine.deregister.job.max.failures                | 4                 
        | Number of failures of job before deregistering the engine.            
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 | kyuubi.engine.event.json.log.path                        | 
file:///tmp/kyuubi/events | The location where all the engine events go for the 
built-in JSON logger.<ul><li>Local Path: start with 'file://'</li><li>HDFS 
Path: start with 'hdfs://'</li></ul>                                            
                                                                                
                                                                                
                                     [...]
 | kyuubi.engine.event.loggers                              | SPARK             
        | A comma-separated list of engine history loggers, where 
engine/session/operation etc events go.<ul> <li>SPARK: the events will be 
written to the Spark listener bus.</li> <li>JSON: the events will be written to 
the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to be 
done</li> <li>CUSTOM: User-defined event handlers.</li></ul> Note that: Kyuubi 
supports custom event handlers with the Jav [...]
-| kyuubi.engine.flink.extra.classpath                      | &lt;undefined&gt; 
        | The extra classpath for the Flink SQL engine, for configuring the 
location of hadoop client jars, etc                                             
                                                                                
                                                                                
                                                                                
                  [...]
-| kyuubi.engine.flink.java.options                         | &lt;undefined&gt; 
        | The extra Java options for the Flink SQL engine                       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| kyuubi.engine.flink.memory                               | 1g                
        | The heap memory for the Flink SQL engine                              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| kyuubi.engine.flink.application.jars                     | &lt;undefined&gt; 
        | A comma-separated list of the local jars to be shipped with the job 
to the cluster. For example, SQL UDF jars. Only effective in yarn application 
mode.                                                                           
                                                                                
                                                                                
                  [...]
+| kyuubi.engine.flink.extra.classpath                      | &lt;undefined&gt; 
        | The extra classpath for the Flink SQL engine, for configuring the 
location of hadoop client jars, etc. Only effective in yarn session mode.       
                                                                                
                                                                                
                                                                                
                  [...]
+| kyuubi.engine.flink.java.options                         | &lt;undefined&gt; 
        | The extra Java options for the Flink SQL engine. Only effective in 
yarn session mode.                                                              
                                                                                
                                                                                
                                                                                
                 [...]
+| kyuubi.engine.flink.memory                               | 1g                
        | The heap memory for the Flink SQL engine. Only effective in yarn 
session mode.                                                                   
                                                                                
                                                                                
                                                                                
                   [...]
 | kyuubi.engine.hive.event.loggers                         | JSON              
        | A comma-separated list of engine history loggers, where 
engine/session/operation etc events go.<ul> <li>JSON: the events will be 
written to the location of kyuubi.engine.event.json.log.path</li> <li>JDBC: to 
be done</li> <li>CUSTOM: to be done.</li></ul>                                  
                                                                                
                                    [...]
 | kyuubi.engine.hive.extra.classpath                       | &lt;undefined&gt; 
        | The extra classpath for the Hive query engine, for configuring 
location of the hadoop client jars and etc.                                     
                                                                                
                                                                                
                                                                                
                     [...]
 | kyuubi.engine.hive.java.options                          | &lt;undefined&gt; 
        | The extra Java options for the Hive query engine                      
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
diff --git a/externals/kyuubi-flink-sql-engine/pom.xml 
b/externals/kyuubi-flink-sql-engine/pom.xml
index f3633b904..0e499f978 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -126,11 +126,49 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-zookeeper_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-test-utils</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-minicluster</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15on</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcpkix-jdk15on</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>jakarta.activation</groupId>
+            <artifactId>jakarta.activation-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>jakarta.xml.bind</groupId>
+            <artifactId>jakarta.xml.bind-api</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
 
b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
new file mode 100644
index 000000000..69d69a55c
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application.executors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+import java.util.Collection;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.ClientOptions;
+import org.apache.flink.client.deployment.application.EmbeddedJobClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.PipelineExecutor;
+import org.apache.flink.core.execution.PipelineExecutorFactory;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.util.concurrent.ScheduledExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Copied from Apache Flink to exposed the DispatcherGateway for Kyuubi 
statements. */
+@Internal
+public class EmbeddedExecutorFactory implements PipelineExecutorFactory {
+
+  private static Collection<JobID> bootstrapJobIds;
+
+  private static Collection<JobID> submittedJobIds;
+
+  private static DispatcherGateway dispatcherGateway;
+
+  private static ScheduledExecutor retryExecutor;
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(EmbeddedExecutorFactory.class);
+
+  public EmbeddedExecutorFactory() {
+    LOGGER.debug(
+        "{} loaded in thread {} with classloader {}.",
+        this.getClass().getCanonicalName(),
+        Thread.currentThread().getName(),
+        this.getClass().getClassLoader().toString());
+  }
+
+  /**
+   * Creates an {@link EmbeddedExecutorFactory}.
+   *
+   * @param submittedJobIds a list that is going to be filled with the job ids 
of the new jobs that
+   *     will be submitted. This is essentially used to return the submitted 
job ids to the caller.
+   * @param dispatcherGateway the dispatcher of the cluster which is going to 
be used to submit
+   *     jobs.
+   */
+  public EmbeddedExecutorFactory(
+      final Collection<JobID> submittedJobIds,
+      final DispatcherGateway dispatcherGateway,
+      final ScheduledExecutor retryExecutor) {
+    // there should be only one instance of EmbeddedExecutorFactory
+    LOGGER.debug(
+        "{} initiated in thread {} with classloader {}.",
+        this.getClass().getCanonicalName(),
+        Thread.currentThread().getName(),
+        this.getClass().getClassLoader().toString());
+    checkState(EmbeddedExecutorFactory.submittedJobIds == null);
+    checkState(EmbeddedExecutorFactory.dispatcherGateway == null);
+    checkState(EmbeddedExecutorFactory.retryExecutor == null);
+    // submittedJobIds would be always 1, because we create a new list to 
avoid concurrent access
+    // issues
+    EmbeddedExecutorFactory.submittedJobIds =
+        new ConcurrentLinkedQueue<>(checkNotNull(submittedJobIds));
+    EmbeddedExecutorFactory.bootstrapJobIds = submittedJobIds;
+    EmbeddedExecutorFactory.dispatcherGateway = 
checkNotNull(dispatcherGateway);
+    EmbeddedExecutorFactory.retryExecutor = checkNotNull(retryExecutor);
+  }
+
+  @Override
+  public String getName() {
+    return EmbeddedExecutor.NAME;
+  }
+
+  @Override
+  public boolean isCompatibleWith(final Configuration configuration) {
+    // override Flink's implementation to allow usage in Kyuubi
+    LOGGER.debug("matching execution target: {}", 
configuration.get(DeploymentOptions.TARGET));
+    return 
configuration.get(DeploymentOptions.TARGET).equalsIgnoreCase("yarn-application")
+        && configuration.toMap().getOrDefault("yarn.tags", 
"").toLowerCase().contains("kyuubi");
+  }
+
+  @Override
+  public PipelineExecutor getExecutor(final Configuration configuration) {
+    checkNotNull(configuration);
+    Collection<JobID> executorJobIDs;
+    if (bootstrapJobIds.size() > 0) {
+      LOGGER.info("Submitting new Kyuubi job. Job already submitted: {}.", 
submittedJobIds.size());
+      executorJobIDs = submittedJobIds;
+    } else {
+      LOGGER.info("Bootstrapping Flink SQL engine.");
+      executorJobIDs = bootstrapJobIds;
+    }
+    return new EmbeddedExecutor(
+        executorJobIDs,
+        dispatcherGateway,
+        (jobId, userCodeClassloader) -> {
+          final Time timeout =
+              
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
+          return new EmbeddedJobClient(
+              jobId, dispatcherGateway, retryExecutor, timeout, 
userCodeClassloader);
+        });
+  }
+}
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/resources/META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
 
b/externals/kyuubi-flink-sql-engine/src/main/resources/META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
new file mode 100644
index 000000000..c394c07a7
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/resources/META-INF/services/org.apache.flink.core.execution.PipelineExecutorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.application.executors.EmbeddedExecutorFactory
\ No newline at end of file
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 06fdc65ae..42061a369 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -28,6 +28,7 @@ import scala.collection.mutable.ListBuffer
 
 import org.apache.flink.client.cli.{DefaultCLI, GenericCLI}
 import org.apache.flink.configuration.{Configuration, DeploymentOptions, 
GlobalConfiguration}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.client.SqlClientException
 import org.apache.flink.table.client.gateway.context.DefaultContext
 import org.apache.flink.util.JarUtils
@@ -71,9 +72,12 @@ object FlinkSQLEngine extends Logging {
   def main(args: Array[String]): Unit = {
     SignalRegister.registerLogger(logger)
 
+    info(s"Flink SQL engine classpath: 
${System.getProperty("java.class.path")}")
+
     FlinkEngineUtils.checkFlinkVersion()
 
     try {
+      kyuubiConf.loadFileDefaults()
       Utils.fromCommandLineArgs(args, kyuubiConf)
       val flinkConfDir = sys.env.getOrElse(
         "FLINK_CONF_DIR", {
@@ -100,6 +104,11 @@ object FlinkSQLEngine extends Logging {
             val appName = s"kyuubi_${user}_flink_${Instant.now}"
             flinkConf.setString("yarn.application.name", appName)
           }
+          if (flinkConf.containsKey("high-availability.cluster-id")) {
+            flinkConf.setString(
+              "yarn.application.id",
+              flinkConf.toMap.get("high-availability.cluster-id"))
+          }
         case "kubernetes-application" =>
           if (!flinkConf.containsKey("kubernetes.cluster-id")) {
             val appName = s"kyuubi-${user}-flink-${Instant.now}"
@@ -122,7 +131,11 @@ object FlinkSQLEngine extends Logging {
       kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
 
       startEngine(engineContext)
-      info("started engine...")
+      info("Flink engine started")
+
+      if ("yarn-application".equalsIgnoreCase(executionTarget)) {
+        bootstrapFlinkApplicationExecutor(flinkConf)
+      }
 
       // blocking main thread
       countDownLatch.await()
@@ -146,6 +159,15 @@ object FlinkSQLEngine extends Logging {
     }
   }
 
+  private def bootstrapFlinkApplicationExecutor(flinkConf: Configuration) = {
+    // trigger an execution to initiate EmbeddedExecutor
+    info("Running initial Flink SQL in application mode.")
+    val tableEnv = TableEnvironment.create(flinkConf)
+    val res = tableEnv.executeSql("select 'kyuubi'")
+    res.await()
+    info("Initial Flink SQL finished.")
+  }
+
   private def discoverDependencies(
       jars: Seq[URL],
       libraries: Seq[URL]): List[URL] = {
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index de104150f..10ad5bf6d 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -28,7 +28,7 @@ import org.apache.flink.table.api.ResultKind
 import org.apache.flink.table.client.gateway.TypedResult
 import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
 import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
-import org.apache.flink.table.operations.{Operation, QueryOperation}
+import org.apache.flink.table.operations.{ModifyOperation, Operation, 
QueryOperation}
 import org.apache.flink.table.operations.command._
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical._
@@ -80,6 +80,7 @@ class ExecuteStatement(
       val operation = executor.parseStatement(sessionId, statement)
       operation match {
         case queryOperation: QueryOperation => 
runQueryOperation(queryOperation)
+        case modifyOperation: ModifyOperation => 
runModifyOperation(modifyOperation)
         case setOperation: SetOperation =>
           resultSet = OperationUtils.runSetOperation(setOperation, executor, 
sessionId)
         case resetOperation: ResetOperation =>
@@ -143,6 +144,12 @@ class ExecuteStatement(
     }
   }
 
+  private def runModifyOperation(operation: ModifyOperation): Unit = {
+    val result = executor.executeOperation(sessionId, operation)
+    jobId = result.getJobClient.asScala.map(_.getJobID)
+    resultSet = ResultSet.fromJobId(jobId.orNull)
+  }
+
   private def runOperation(operation: Operation): Unit = {
     val result = executor.executeOperation(sessionId, operation)
     jobId = result.getJobClient.asScala.map(_.getJobID)
diff --git 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
index 136733812..09c401988 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
@@ -22,7 +22,8 @@ import java.util
 import scala.collection.JavaConverters._
 
 import com.google.common.collect.Iterators
-import org.apache.flink.table.api.{ResultKind, TableResult}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.table.api.{DataTypes, ResultKind, TableResult}
 import org.apache.flink.table.catalog.Column
 import org.apache.flink.types.Row
 
@@ -68,6 +69,20 @@ object ResultSet {
       .build
   }
 
+  def fromJobId(jobID: JobID): ResultSet = {
+    val data: Array[Row] = if (jobID != null) {
+      Array(Row.of(jobID.toString))
+    } else {
+      // should not happen
+      Array(Row.of("(Empty Job ID)"))
+    }
+    builder
+      .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+      .columns(Column.physical("result", DataTypes.STRING()))
+      .data(data)
+      .build;
+  }
+
   def builder: Builder = new ResultSet.Builder
 
   class Builder {
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala
new file mode 100644
index 000000000..aebcce6c5
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink
+
+import java.util.UUID
+
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_TYPE}
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, 
HA_NAMESPACE}
+import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider}
+
+trait WithDiscoveryFlinkSQLEngine extends WithFlinkSQLEngineOnYarn {
+
+  override protected def engineRefId: String = UUID.randomUUID().toString
+
+  def namespace: String = "/kyuubi/flink-yarn-application-test"
+
+  def shareLevel: String = ShareLevel.USER.toString
+
+  def engineType: String = "flink"
+
+  override def withKyuubiConf: Map[String, String] = {
+    Map(
+      HA_NAMESPACE.key -> namespace,
+      HA_ENGINE_REF_ID.key -> engineRefId,
+      ENGINE_TYPE.key -> "FLINK_SQL",
+      ENGINE_SHARE_LEVEL.key -> shareLevel)
+  }
+
+  def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
+    DiscoveryClientProvider.withDiscoveryClient(conf)(f)
+  }
+
+  def getFlinkEngineServiceUrl: String = {
+    var hostPort: Option[(String, Int)] = None
+    var retries = 0
+    while (hostPort.isEmpty && retries < 5) {
+      withDiscoveryClient(client => hostPort = client.getServerHost(namespace))
+      retries += 1
+      Thread.sleep(1000L)
+    }
+    if (hostPort.isEmpty) {
+      throw new RuntimeException("Time out retrieving Flink engine service 
url.")
+    }
+    // delay the access to thrift service because the thrift service
+    // may not be ready although it's registered
+    Thread.sleep(3000L)
+    s"jdbc:hive2://${hostPort.get._1}:${hostPort.get._2}"
+  }
+}
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngine.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
similarity index 79%
rename from 
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngine.scala
rename to 
externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
index fbfb8df29..c8435f9c5 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngine.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
@@ -24,32 +24,20 @@ import org.apache.flink.configuration.{Configuration, 
RestOptions}
 import org.apache.flink.runtime.minicluster.{MiniCluster, 
MiniClusterConfiguration}
 import org.apache.flink.table.client.gateway.context.DefaultContext
 
-import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
 
-trait WithFlinkSQLEngine extends KyuubiFunSuite {
+trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with 
WithFlinkTestResources {
 
   protected val flinkConfig = new Configuration()
   protected var miniCluster: MiniCluster = _
   protected var engine: FlinkSQLEngine = _
   // conf will be loaded until start flink engine
   def withKyuubiConf: Map[String, String]
-  val kyuubiConf: KyuubiConf = FlinkSQLEngine.kyuubiConf
+  protected val kyuubiConf: KyuubiConf = FlinkSQLEngine.kyuubiConf
 
   protected var connectionUrl: String = _
 
-  protected val GENERATED_UDF_CLASS: String = "LowerUDF"
-
-  protected val GENERATED_UDF_CODE: String =
-    s"""
-      public class $GENERATED_UDF_CLASS extends 
org.apache.flink.table.functions.ScalarFunction {
-        public String eval(String str) {
-          return str.toLowerCase();
-        }
-      }
-     """
-
   override def beforeAll(): Unit = {
     startMiniCluster()
     startFlinkEngine()
@@ -67,11 +55,6 @@ trait WithFlinkSQLEngine extends KyuubiFunSuite {
       System.setProperty(k, v)
       kyuubiConf.set(k, v)
     }
-    val udfJar = TestUserClassLoaderJar.createJarFile(
-      Utils.createTempDir("test-jar").toFile,
-      "test-classloader-udf.jar",
-      GENERATED_UDF_CLASS,
-      GENERATED_UDF_CODE)
     val engineContext = new DefaultContext(
       List(udfJar.toURI.toURL).asJava,
       flinkConfig,
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
new file mode 100644
index 000000000..3847087b3
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink
+
+import java.io.{File, FilenameFilter, FileWriter}
+import java.lang.ProcessBuilder.Redirect
+import java.net.URI
+import java.nio.file.{Files, Paths}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hdfs.MiniDFSCluster
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.server.MiniYARNCluster
+
+import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, 
SCALA_COMPILE_VERSION, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, 
KYUUBI_HOME}
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
+import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
+import org.apache.kyuubi.zookeeper.ZookeeperConf.{ZK_CLIENT_PORT, 
ZK_CLIENT_PORT_ADDRESS}
+
+trait WithFlinkSQLEngineOnYarn extends KyuubiFunSuite with 
WithFlinkTestResources {
+
+  protected def engineRefId: String
+
+  protected val conf: KyuubiConf = new KyuubiConf(false)
+
+  private var hdfsCluster: MiniDFSCluster = _
+
+  private var yarnCluster: MiniYARNCluster = _
+
+  private var zkServer: EmbeddedZookeeper = _
+
+  def withKyuubiConf: Map[String, String]
+
+  private val yarnConf: YarnConfiguration = {
+    val yarnConfig = new YarnConfiguration()
+
+    // configurations copied from org.apache.flink.yarn.YarnTestBase
+    yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 32)
+    yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 
4096)
+
+    
yarnConfig.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, 
true)
+    yarnConfig.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2)
+    yarnConfig.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2)
+    
yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4)
+    yarnConfig.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600)
+    yarnConfig.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false)
+    // memory is overwritten in the MiniYARNCluster.
+    // so we have to change the number of cores for testing.
+    yarnConfig.setInt(YarnConfiguration.NM_VCORES, 666)
+    
yarnConfig.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 
99.0f)
+    
yarnConfig.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 
1000)
+    yarnConfig.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, 
5000)
+
+    // capacity-scheduler.xml is missing in hadoop-client-minicluster so this 
is a workaround
+    yarnConfig.set("yarn.scheduler.capacity.root.queues", 
"default,four_cores_queue")
+
+    yarnConfig.setInt("yarn.scheduler.capacity.root.default.capacity", 100)
+    
yarnConfig.setFloat("yarn.scheduler.capacity.root.default.user-limit-factor", 1)
+    yarnConfig.setInt("yarn.scheduler.capacity.root.default.maximum-capacity", 
100)
+    yarnConfig.set("yarn.scheduler.capacity.root.default.state", "RUNNING")
+    
yarnConfig.set("yarn.scheduler.capacity.root.default.acl_submit_applications", 
"*")
+    
yarnConfig.set("yarn.scheduler.capacity.root.default.acl_administer_queue", "*")
+
+    
yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-capacity",
 100)
+    
yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-applications",
 10)
+    
yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-allocation-vcores",
 4)
+    
yarnConfig.setFloat("yarn.scheduler.capacity.root.four_cores_queue.user-limit-factor",
 1)
+    
yarnConfig.set("yarn.scheduler.capacity.root.four_cores_queue.acl_submit_applications",
 "*")
+    
yarnConfig.set("yarn.scheduler.capacity.root.four_cores_queue.acl_administer_queue",
 "*")
+
+    yarnConfig.setInt("yarn.scheduler.capacity.node-locality-delay", -1)
+    // Set bind host to localhost to avoid java.net.BindException
+    yarnConfig.set(YarnConfiguration.RM_BIND_HOST, "localhost")
+    yarnConfig.set(YarnConfiguration.NM_BIND_HOST, "localhost")
+
+    yarnConfig
+  }
+
+  override def beforeAll(): Unit = {
+    zkServer = new EmbeddedZookeeper()
+    conf.set(ZK_CLIENT_PORT, 0).set(ZK_CLIENT_PORT_ADDRESS, "localhost")
+    zkServer.initialize(conf)
+    zkServer.start()
+    conf.set(HA_ADDRESSES, zkServer.getConnectString)
+
+    hdfsCluster = new MiniDFSCluster.Builder(new Configuration)
+      .numDataNodes(1)
+      .checkDataNodeAddrConfig(true)
+      .checkDataNodeHostConfig(true)
+      .build()
+
+    val hdfsServiceUrl = s"hdfs://localhost:${hdfsCluster.getNameNodePort}"
+    yarnConf.set("fs.defaultFS", hdfsServiceUrl)
+    yarnConf.addResource(hdfsCluster.getConfiguration(0))
+
+    val cp = System.getProperty("java.class.path")
+    // exclude kyuubi flink engine jar that has SPI for EmbeddedExecutorFactory
+    // which can't be initialized on the client side
+    val hadoopJars = cp.split(":").filter(s => !s.contains("flink") && 
!s.contains("log4j"))
+    val hadoopClasspath = hadoopJars.mkString(":")
+    yarnConf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, hadoopClasspath)
+
+    yarnCluster = new MiniYARNCluster("flink-engine-cluster", 1, 1, 1)
+    yarnCluster.init(yarnConf)
+    yarnCluster.start()
+
+    val hadoopConfDir = Utils.createTempDir().toFile
+    val writer = new FileWriter(new File(hadoopConfDir, "core-site.xml"))
+    yarnCluster.getConfig.writeXml(writer)
+    writer.close()
+
+    val envs = scala.collection.mutable.Map[String, String]()
+    val kyuubiExternals = Utils.getCodeSourceLocation(getClass)
+      .split("externals").head
+    val flinkHome = {
+      val candidates = Paths.get(kyuubiExternals, "externals", 
"kyuubi-download", "target")
+        .toFile.listFiles(f => f.getName.contains("flink"))
+      if (candidates == null) None else candidates.map(_.toPath).headOption
+    }
+    if (flinkHome.isDefined) {
+      envs("FLINK_HOME") = flinkHome.get.toString
+      envs("FLINK_CONF_DIR") = Paths.get(flinkHome.get.toString, 
"conf").toString
+    }
+    envs("HADOOP_CLASSPATH") = hadoopClasspath
+    envs("HADOOP_CONF_DIR") = hadoopConfDir.getAbsolutePath
+
+    startFlinkEngine(envs.toMap)
+
+    super.beforeAll()
+  }
+
+  private def startFlinkEngine(envs: Map[String, String]): Unit = {
+    val processBuilder: ProcessBuilder = new ProcessBuilder
+    processBuilder.environment().putAll(envs.asJava)
+
+    conf.set(ENGINE_FLINK_APPLICATION_JARS, udfJar.getAbsolutePath)
+    val flinkExtraJars = extraFlinkJars(envs("FLINK_HOME"))
+    val command = new ArrayBuffer[String]()
+
+    command += s"${envs("FLINK_HOME")}${File.separator}bin/flink"
+    command += "run-application"
+    command += "-t"
+    command += "yarn-application"
+    command += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
+    command += s"-Dyarn.tags=KYUUBI,$engineRefId"
+    command += "-Djobmanager.memory.process.size=1g"
+    command += "-Dtaskmanager.memory.process.size=1g"
+    command += "-Dcontainerized.master.env.FLINK_CONF_DIR=."
+    command += "-Dcontainerized.taskmanager.env.FLINK_CONF_DIR=."
+    command += 
s"-Dcontainerized.master.env.HADOOP_CONF_DIR=${envs("HADOOP_CONF_DIR")}"
+    command += 
s"-Dcontainerized.taskmanager.env.HADOOP_CONF_DIR=${envs("HADOOP_CONF_DIR")}"
+    command += "-Dexecution.target=yarn-application"
+    command += "-c"
+    command += "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
+    command += s"${mainResource(envs).get}"
+
+    for ((k, v) <- withKyuubiConf) {
+      conf.set(k, v)
+    }
+
+    for ((k, v) <- conf.getAll) {
+      command += "--conf"
+      command += s"$k=$v"
+    }
+
+    processBuilder.command(command.toList.asJava)
+    processBuilder.redirectOutput(Redirect.INHERIT)
+    processBuilder.redirectError(Redirect.INHERIT)
+
+    info(s"staring flink yarn-application cluster for engine $engineRefId..")
+    val process = processBuilder.start()
+    process.waitFor()
+    info(s"flink yarn-application cluster for engine $engineRefId has started")
+  }
+
+  def extraFlinkJars(flinkHome: String): Array[String] = {
+    // locate flink sql jars
+    val flinkExtraJars = new ListBuffer[String]
+    val flinkSQLJars = Paths.get(flinkHome)
+      .resolve("opt")
+      .toFile
+      .listFiles(new FilenameFilter {
+        override def accept(dir: File, name: String): Boolean = {
+          name.toLowerCase.startsWith("flink-sql-client") ||
+          name.toLowerCase.startsWith("flink-sql-gateway")
+        }
+      }).map(f => f.getAbsolutePath).sorted
+    flinkExtraJars ++= flinkSQLJars
+
+    val userJars = conf.get(ENGINE_FLINK_APPLICATION_JARS)
+    userJars.foreach(jars => flinkExtraJars ++= jars.split(","))
+    flinkExtraJars.toArray
+  }
+
+  /**
+   * Copied form org.apache.kyuubi.engine.ProcBuilder
+   * The engine jar or other runnable jar containing the main method
+   */
+  def mainResource(env: Map[String, String]): Option[String] = {
+    // 1. get the main resource jar for user specified config first
+    val module = "kyuubi-flink-sql-engine"
+    val shortName = "flink"
+    val jarName = s"${module}_$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
+    conf.getOption(s"kyuubi.session.engine.$shortName.main.resource").filter { 
userSpecified =>
+      // skip check exist if not local file.
+      val uri = new URI(userSpecified)
+      val schema = if (uri.getScheme != null) uri.getScheme else "file"
+      schema match {
+        case "file" => Files.exists(Paths.get(userSpecified))
+        case _ => true
+      }
+    }.orElse {
+      // 2. get the main resource jar from system build default
+      env.get(KYUUBI_HOME).toSeq
+        .flatMap { p =>
+          Seq(
+            Paths.get(p, "externals", "engines", shortName, jarName),
+            Paths.get(p, "externals", module, "target", jarName))
+        }
+        .find(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
+    }.orElse {
+      // 3. get the main resource from dev environment
+      val cwd = Utils.getCodeSourceLocation(getClass).split("externals")
+      assert(cwd.length > 1)
+      Option(Paths.get(cwd.head, "externals", module, "target", jarName))
+        .map(_.toAbsolutePath.toFile.getCanonicalPath)
+    }
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    if (yarnCluster != null) {
+      yarnCluster.stop()
+      yarnCluster = null
+    }
+    if (hdfsCluster != null) {
+      hdfsCluster.shutdown()
+      hdfsCluster = null
+    }
+    if (zkServer != null) {
+      zkServer.stop()
+      zkServer = null
+    }
+  }
+}
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
new file mode 100644
index 000000000..6a85654f0
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
+
+trait WithFlinkTestResources {
+
+  protected val GENERATED_UDF_CLASS: String = "LowerUDF"
+
+  protected val GENERATED_UDF_CODE: String =
+    s"""
+      public class $GENERATED_UDF_CLASS extends 
org.apache.flink.table.functions.ScalarFunction {
+        public String eval(String str) {
+          return str.toLowerCase();
+        }
+      }
+     """
+
+  protected val udfJar = TestUserClassLoaderJar.createJarFile(
+    Utils.createTempDir("test-jar").toFile,
+    "test-classloader-udf.jar",
+    GENERATED_UDF_CLASS,
+    GENERATED_UDF_CODE)
+}
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
new file mode 100644
index 000000000..e4e6a5c67
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.flink.WithFlinkSQLEngineLocal
+import org.apache.kyuubi.operation.NoneMode
+
+class FlinkOperationLocalSuite extends FlinkOperationSuite
+  with WithFlinkSQLEngineLocal {
+
+  override def withKyuubiConf: Map[String, String] =
+    Map(OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name)
+
+  override protected def jdbcUrl: String =
+    s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
+
+}
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
new file mode 100644
index 000000000..b43e83db6
--- /dev/null
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import org.apache.kyuubi.engine.flink.WithDiscoveryFlinkSQLEngine
+
+class FlinkOperationOnYarnSuite extends FlinkOperationSuite
+  with WithDiscoveryFlinkSQLEngine {
+
+  protected def jdbcUrl: String = getFlinkEngineServiceUrl
+}
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 8345d4f9f..77ce3b3ee 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -25,34 +25,17 @@ import scala.collection.JavaConverters._
 import org.apache.flink.api.common.JobID
 import org.apache.flink.table.types.logical.LogicalTypeRoot
 import org.apache.hive.service.rpc.thrift._
-import org.scalatest.concurrent.PatienceConfiguration.Timeout
-import org.scalatest.time.SpanSugar._
 
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
+import org.apache.kyuubi.engine.flink.WithFlinkTestResources
 import org.apache.kyuubi.engine.flink.result.Constants
 import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
 import org.apache.kyuubi.jdbc.hive.KyuubiStatement
-import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode}
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
-import org.apache.kyuubi.service.ServiceState._
 
-class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
-  override def withKyuubiConf: Map[String, String] =
-    Map(OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name)
-
-  override protected def jdbcUrl: String =
-    s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
-
-  ignore("release session if shared level is CONNECTION") {
-    logger.info(s"jdbc url is $jdbcUrl")
-    assert(engine.getServiceState == STARTED)
-    withJdbcStatement() { _ => }
-    eventually(Timeout(20.seconds)) {
-      assert(engine.getServiceState == STOPPED)
-    }
-  }
+abstract class FlinkOperationSuite extends HiveJDBCTestHelper with 
WithFlinkTestResources {
 
   test("get catalogs") {
     withJdbcStatement() { statement =>
@@ -784,7 +767,8 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with 
HiveJDBCTestHelper {
     withJdbcStatement() { statement =>
       val resultSet = statement.executeQuery("select map ['k1', 'v1', 'k2', 
'v2']")
       assert(resultSet.next())
-      assert(resultSet.getString(1) == "{k1=v1, k2=v2}")
+      assert(List("{k1=v1, k2=v2}", "{k2=v2, k1=v1}")
+        .contains(resultSet.getString(1)))
       val metaData = resultSet.getMetaData
       assert(metaData.getColumnType(1) === java.sql.Types.JAVA_OBJECT)
     }
@@ -966,16 +950,34 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with 
HiveJDBCTestHelper {
     }
   }
 
-  test("execute statement - insert into") {
+  test("execute statement - batch insert into") {
     withMultipleConnectionJdbcStatement() { statement =>
       statement.executeQuery("create table tbl_a (a int) with ('connector' = 
'blackhole')")
       val resultSet = statement.executeQuery("insert into tbl_a select 1")
       val metadata = resultSet.getMetaData
-      assert(metadata.getColumnName(1) == 
"default_catalog.default_database.tbl_a")
-      assert(metadata.getColumnType(1) == java.sql.Types.BIGINT)
+      assert(metadata.getColumnName(1) === "result")
+      assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
       assert(resultSet.next())
-      assert(resultSet.getLong(1) == -1L)
-    }
+      assert(resultSet.getString(1).length == 32)
+    };
+  }
+
+  test("execute statement - streaming insert into") {
+    withMultipleConnectionJdbcStatement()({ statement =>
+      // Flink currently doesn't support stop job statement, thus use a finite 
stream
+      statement.executeQuery(
+        "create table tbl_a (a int) with (" +
+          "'connector' = 'datagen', " +
+          "'rows-per-second'='10', " +
+          "'number-of-rows'='100')")
+      statement.executeQuery("create table tbl_b (a int) with ('connector' = 
'blackhole')")
+      val resultSet = statement.executeQuery("insert into tbl_b select * from 
tbl_a")
+      val metadata = resultSet.getMetaData
+      assert(metadata.getColumnName(1) === "result")
+      assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
+      assert(resultSet.next())
+      assert(resultSet.getString(1).length == 32)
+    })
   }
 
   test("execute statement - set properties") {
diff --git 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
index 1194f3582..1657f21f6 100644
--- 
a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
+++ 
b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
@@ -20,10 +20,10 @@ package org.apache.kyuubi.engine.flink.operation
 import java.sql.Statement
 
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
+import org.apache.kyuubi.engine.flink.WithFlinkSQLEngineLocal
 import org.apache.kyuubi.operation.{AnalyzeMode, ExecutionMode, 
HiveJDBCTestHelper, ParseMode, PhysicalMode}
 
-class PlanOnlyOperationSuite extends WithFlinkSQLEngine with 
HiveJDBCTestHelper {
+class PlanOnlyOperationSuite extends WithFlinkSQLEngineLocal with 
HiveJDBCTestHelper {
 
   override def withKyuubiConf: Map[String, String] =
     Map(
diff --git a/integration-tests/kyuubi-flink-it/pom.xml 
b/integration-tests/kyuubi-flink-it/pom.xml
index c6a55c62c..eada7841c 100644
--- a/integration-tests/kyuubi-flink-it/pom.xml
+++ b/integration-tests/kyuubi-flink-it/pom.xml
@@ -79,6 +79,37 @@
             <scope>test</scope>
         </dependency>
 
+        <!-- YARN -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-minicluster</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcprov-jdk15on</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.bouncycastle</groupId>
+            <artifactId>bcpkix-jdk15on</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>jakarta.activation</groupId>
+            <artifactId>jakarta.activation-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>jakarta.xml.bind</groupId>
+            <artifactId>jakarta.xml.bind-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
 </project>
diff --git 
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/WithKyuubiServerAndYarnMiniCluster.scala
 
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/WithKyuubiServerAndYarnMiniCluster.scala
new file mode 100644
index 000000000..de9a8ae2d
--- /dev/null
+++ 
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/WithKyuubiServerAndYarnMiniCluster.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.it.flink
+
+import java.io.{File, FileWriter}
+import java.nio.file.Paths
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils, WithKyuubiServer}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
+import org.apache.kyuubi.server.{MiniDFSService, MiniYarnService}
+
+trait WithKyuubiServerAndYarnMiniCluster extends KyuubiFunSuite with 
WithKyuubiServer {
+
+  val kyuubiHome: String = 
Utils.getCodeSourceLocation(getClass).split("integration-tests").head
+
+  override protected val conf: KyuubiConf = new KyuubiConf(false)
+
+  protected var miniHdfsService: MiniDFSService = _
+
+  protected var miniYarnService: MiniYarnService = _
+
+  private val yarnConf: YarnConfiguration = {
+    val yarnConfig = new YarnConfiguration()
+
+    // configurations copied from org.apache.flink.yarn.YarnTestBase
+    yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 32)
+    yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 
4096)
+
+    
yarnConfig.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, 
true)
+    yarnConfig.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2)
+    yarnConfig.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2)
+    
yarnConfig.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4)
+    yarnConfig.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600)
+    yarnConfig.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false)
+    // memory is overwritten in the MiniYARNCluster.
+    // so we have to change the number of cores for testing.
+    yarnConfig.setInt(YarnConfiguration.NM_VCORES, 666)
+    
yarnConfig.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 
99.0f)
+    
yarnConfig.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS, 
1000)
+    yarnConfig.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, 
5000)
+
+    // capacity-scheduler.xml is missing in hadoop-client-minicluster so this 
is a workaround
+    yarnConfig.set("yarn.scheduler.capacity.root.queues", 
"default,four_cores_queue")
+
+    yarnConfig.setInt("yarn.scheduler.capacity.root.default.capacity", 100)
+    
yarnConfig.setFloat("yarn.scheduler.capacity.root.default.user-limit-factor", 1)
+    yarnConfig.setInt("yarn.scheduler.capacity.root.default.maximum-capacity", 
100)
+    yarnConfig.set("yarn.scheduler.capacity.root.default.state", "RUNNING")
+    
yarnConfig.set("yarn.scheduler.capacity.root.default.acl_submit_applications", 
"*")
+    
yarnConfig.set("yarn.scheduler.capacity.root.default.acl_administer_queue", "*")
+
+    
yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-capacity",
 100)
+    
yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-applications",
 10)
+    
yarnConfig.setInt("yarn.scheduler.capacity.root.four_cores_queue.maximum-allocation-vcores",
 4)
+    
yarnConfig.setFloat("yarn.scheduler.capacity.root.four_cores_queue.user-limit-factor",
 1)
+    
yarnConfig.set("yarn.scheduler.capacity.root.four_cores_queue.acl_submit_applications",
 "*")
+    
yarnConfig.set("yarn.scheduler.capacity.root.four_cores_queue.acl_administer_queue",
 "*")
+
+    yarnConfig.setInt("yarn.scheduler.capacity.node-locality-delay", -1)
+    // Set bind host to localhost to avoid java.net.BindException
+    yarnConfig.set(YarnConfiguration.RM_BIND_HOST, "localhost")
+    yarnConfig.set(YarnConfiguration.NM_BIND_HOST, "localhost")
+
+    yarnConfig
+  }
+
+  override def beforeAll(): Unit = {
+    miniHdfsService = new MiniDFSService()
+    miniHdfsService.initialize(conf)
+    miniHdfsService.start()
+
+    val hdfsServiceUrl = s"hdfs://localhost:${miniHdfsService.getDFSPort}"
+    yarnConf.set("fs.defaultFS", hdfsServiceUrl)
+    yarnConf.addResource(miniHdfsService.getHadoopConf)
+
+    val cp = System.getProperty("java.class.path")
+    // exclude kyuubi flink engine jar that has SPI for EmbeddedExecutorFactory
+    // which can't be initialized on the client side
+    val hadoopJars = cp.split(":").filter(s => !s.contains("flink"))
+    val hadoopClasspath = hadoopJars.mkString(":")
+    yarnConf.set("yarn.application.classpath", hadoopClasspath)
+
+    miniYarnService = new MiniYarnService()
+    miniYarnService.setYarnConf(yarnConf)
+    miniYarnService.initialize(conf)
+    miniYarnService.start()
+
+    val hadoopConfDir = Utils.createTempDir().toFile
+    val writer = new FileWriter(new File(hadoopConfDir, "core-site.xml"))
+    yarnConf.writeXml(writer)
+    writer.close()
+
+    val flinkHome = {
+      val candidates = Paths.get(kyuubiHome, "externals", "kyuubi-download", 
"target")
+        .toFile.listFiles(f => f.getName.contains("flink"))
+      if (candidates == null) None else candidates.map(_.toPath).headOption
+    }
+    if (flinkHome.isEmpty) {
+      throw new IllegalStateException(s"Flink home not found in 
$kyuubiHome/externals")
+    }
+
+    conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.KYUUBI_HOME", kyuubiHome)
+    conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.FLINK_HOME", flinkHome.get.toString)
+    conf.set(
+      s"$KYUUBI_ENGINE_ENV_PREFIX.FLINK_CONF_DIR",
+      s"${flinkHome.get.toString}${File.separator}conf")
+    conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CLASSPATH", hadoopClasspath)
+    conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR", 
hadoopConfDir.getAbsolutePath)
+    conf.set(s"flink.containerized.master.env.HADOOP_CLASSPATH", 
hadoopClasspath)
+    conf.set(s"flink.containerized.master.env.HADOOP_CONF_DIR", 
hadoopConfDir.getAbsolutePath)
+    conf.set(s"flink.containerized.taskmanager.env.HADOOP_CONF_DIR", 
hadoopConfDir.getAbsolutePath)
+
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    if (miniYarnService != null) {
+      miniYarnService.stop()
+      miniYarnService = null
+    }
+    if (miniHdfsService != null) {
+      miniHdfsService.stop()
+      miniHdfsService = null
+    }
+  }
+}
diff --git 
a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
 
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
new file mode 100644
index 000000000..afa4dce8f
--- /dev/null
+++ 
b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuiteOnYarn.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.it.flink.operation
+
+import org.apache.hive.service.rpc.thrift.{TGetInfoReq, TGetInfoType}
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.it.flink.WithKyuubiServerAndYarnMiniCluster
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
+
+class FlinkOperationSuiteOnYarn extends WithKyuubiServerAndYarnMiniCluster
+  with HiveJDBCTestHelper {
+
+  override protected def jdbcUrl: String = {
+    // delay the access to thrift service because the thrift service
+    // may not be ready although it's registered
+    Thread.sleep(3000L)
+    getJdbcUrl
+  }
+
+  override def beforeAll(): Unit = {
+    conf
+      .set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
+      .set(ENGINE_TYPE, "FLINK_SQL")
+      .set("flink.execution.target", "yarn-application")
+      .set("flink.parallelism.default", "6")
+    super.beforeAll()
+  }
+
+  test("get catalogs for flink sql") {
+    withJdbcStatement() { statement =>
+      val meta = statement.getConnection.getMetaData
+      val catalogs = meta.getCatalogs
+      val expected = Set("default_catalog").toIterator
+      while (catalogs.next()) {
+        assert(catalogs.getString(TABLE_CAT) === expected.next())
+      }
+      assert(!expected.hasNext)
+      assert(!catalogs.next())
+    }
+  }
+
+  test("execute statement - create/alter/drop table") {
+    withJdbcStatement() { statement =>
+      statement.executeQuery("create table tbl_a (a string) with ('connector' 
= 'blackhole')")
+      assert(statement.execute("alter table tbl_a rename to tbl_b"))
+      assert(statement.execute("drop table tbl_b"))
+    }
+  }
+
+  test("execute statement - select column name with dots") {
+    withJdbcStatement() { statement =>
+      val resultSet = statement.executeQuery("select 'tmp.hello'")
+      assert(resultSet.next())
+      assert(resultSet.getString(1) === "tmp.hello")
+    }
+  }
+
+  test("set kyuubi conf into flink conf") {
+    withJdbcStatement() { statement =>
+      val resultSet = statement.executeQuery("SET")
+      // Flink does not support set key without value currently,
+      // thus read all rows to find the desired one
+      var success = false
+      while (resultSet.next() && !success) {
+        if (resultSet.getString(1) == "parallelism.default" &&
+          resultSet.getString(2) == "6") {
+          success = true
+        }
+      }
+      assert(success)
+    }
+  }
+
+  test("server info provider - server") {
+    withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "SERVER"))()() {
+      withSessionHandle { (client, handle) =>
+        val req = new TGetInfoReq()
+        req.setSessionHandle(handle)
+        req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
+        assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache 
Kyuubi")
+      }
+    }
+  }
+
+  test("server info provider - engine") {
+    withSessionConf(Map(KyuubiConf.SERVER_INFO_PROVIDER.key -> "ENGINE"))()() {
+      withSessionHandle { (client, handle) =>
+        val req = new TGetInfoReq()
+        req.setSessionHandle(handle)
+        req.setInfoType(TGetInfoType.CLI_DBMS_NAME)
+        assert(client.GetInfo(req).getInfoValue.getStringValue === "Apache 
Flink")
+      }
+    }
+  }
+}
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index b5229e2ad..2634bb4ab 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -2367,14 +2367,14 @@ object KyuubiConf {
 
   val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
     buildConf("kyuubi.engine.flink.memory")
-      .doc("The heap memory for the Flink SQL engine")
+      .doc("The heap memory for the Flink SQL engine. Only effective in yarn 
session mode.")
       .version("1.6.0")
       .stringConf
       .createWithDefault("1g")
 
   val ENGINE_FLINK_JAVA_OPTIONS: OptionalConfigEntry[String] =
     buildConf("kyuubi.engine.flink.java.options")
-      .doc("The extra Java options for the Flink SQL engine")
+      .doc("The extra Java options for the Flink SQL engine. Only effective in 
yarn session mode.")
       .version("1.6.0")
       .stringConf
       .createOptional
@@ -2382,11 +2382,19 @@ object KyuubiConf {
   val ENGINE_FLINK_EXTRA_CLASSPATH: OptionalConfigEntry[String] =
     buildConf("kyuubi.engine.flink.extra.classpath")
       .doc("The extra classpath for the Flink SQL engine, for configuring the 
location" +
-        " of hadoop client jars, etc")
+        " of hadoop client jars, etc. Only effective in yarn session mode.")
       .version("1.6.0")
       .stringConf
       .createOptional
 
+  val ENGINE_FLINK_APPLICATION_JARS: OptionalConfigEntry[String] =
+    buildConf("kyuubi.engine.flink.application.jars")
+      .doc("A comma-separated list of the local jars to be shipped with the 
job to the cluster. " +
+        "For example, SQL UDF jars. Only effective in yarn application mode.")
+      .version("1.8.0")
+      .stringConf
+      .createOptional
+
   val SERVER_LIMIT_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
     buildConf("kyuubi.server.limit.connections.per.user")
       .doc("Maximum kyuubi server connections per user." +
diff --git 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
index bdb9b12fe..a1b1466d1 100644
--- 
a/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
+++ 
b/kyuubi-ha/src/main/scala/org/apache/kyuubi/ha/client/ServiceDiscovery.scala
@@ -60,6 +60,7 @@ abstract class ServiceDiscovery(
 
   override def start(): Unit = {
     discoveryClient.registerService(conf, namespace, this)
+    info(s"Registered $name in namespace ${_namespace}.")
     super.start()
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 63b37f1c5..b2b3ce909 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -187,7 +187,7 @@ private[kyuubi] class EngineRef(
         conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
         new SparkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
       case FLINK_SQL =>
-        conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
+        conf.setIfMissing(FlinkProcessBuilder.YARN_APP_KEY, defaultEngineName)
         new FlinkProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
       case TRINO =>
         new TrinoProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 9b23e550d..02aed2866 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -105,10 +105,10 @@ object KyuubiApplicationManager {
     conf.set("spark.kubernetes.driver.label." + LABEL_KYUUBI_UNIQUE_KEY, tag)
   }
 
-  private def setupFlinkK8sTag(tag: String, conf: KyuubiConf): Unit = {
-    val originalTag = conf.getOption(FlinkProcessBuilder.TAG_KEY).map(_ + 
",").getOrElse("")
+  private def setupFlinkYarnTag(tag: String, conf: KyuubiConf): Unit = {
+    val originalTag = conf.getOption(FlinkProcessBuilder.YARN_TAG_KEY).map(_ + 
",").getOrElse("")
     val newTag = s"${originalTag}KYUUBI" + 
Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("")
-    conf.set(FlinkProcessBuilder.TAG_KEY, newTag)
+    conf.set(FlinkProcessBuilder.YARN_TAG_KEY, newTag)
   }
 
   val uploadWorkDir: Path = {
@@ -178,7 +178,7 @@ object KyuubiApplicationManager {
         setupSparkK8sTag(applicationTag, conf)
       case ("FLINK", _) =>
         // running flink on other platforms is not yet supported
-        setupFlinkK8sTag(applicationTag, conf)
+        setupFlinkYarnTag(applicationTag, conf)
       // other engine types are running locally yet
       case _ =>
     }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index b8146c4d2..8642d87d7 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -21,7 +21,7 @@ import java.io.{File, FilenameFilter}
 import java.nio.file.{Files, Paths}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 
 import com.google.common.annotations.VisibleForTesting
 
@@ -50,88 +50,150 @@ class FlinkProcessBuilder(
 
   val flinkHome: String = getEngineHome(shortName)
 
+  val flinkExecutable: String = {
+    Paths.get(flinkHome, "bin", FLINK_EXEC_FILE).toFile.getCanonicalPath
+  }
+
   override protected def module: String = "kyuubi-flink-sql-engine"
 
   override protected def mainClass: String = 
"org.apache.kyuubi.engine.flink.FlinkSQLEngine"
 
   override def env: Map[String, String] = conf.getEnvs +
-    (FLINK_PROXY_USER_KEY -> proxyUser)
+    ("FLINK_CONF_DIR" -> conf.getEnvs.getOrElse(
+      "FLINK_CONF_DIR",
+      s"$flinkHome${File.separator}conf"))
+
+  override def clusterManager(): Option[String] = Some("yarn")
 
   override protected val commands: Array[String] = {
     KyuubiApplicationManager.tagApplication(engineRefId, shortName, 
clusterManager(), conf)
-    val buffer = new ArrayBuffer[String]()
-    buffer += executable
-
-    val memory = conf.get(ENGINE_FLINK_MEMORY)
-    buffer += s"-Xmx$memory"
-    val javaOptions = conf.get(ENGINE_FLINK_JAVA_OPTIONS)
-    if (javaOptions.isDefined) {
-      buffer += javaOptions.get
-    }
 
-    buffer += "-cp"
-    val classpathEntries = new java.util.LinkedHashSet[String]
-    // flink engine runtime jar
-    mainResource.foreach(classpathEntries.add)
-    // flink sql client jar
-    val flinkSqlClientPath = Paths.get(flinkHome)
-      .resolve("opt")
-      .toFile
-      .listFiles(new FilenameFilter {
-        override def accept(dir: File, name: String): Boolean = {
-          name.toLowerCase.startsWith("flink-sql-client")
+    // flink.execution.target are required in Kyuubi conf currently
+    val executionTarget = conf.getOption("flink.execution.target")
+    executionTarget match {
+      case Some("yarn-application") =>
+        val buffer = new ArrayBuffer[String]()
+        buffer += flinkExecutable
+        buffer += "run-application"
+
+        val flinkExtraJars = new ListBuffer[String]
+        // locate flink sql jars
+        val flinkSqlJars = Paths.get(flinkHome)
+          .resolve("opt")
+          .toFile
+          .listFiles(new FilenameFilter {
+            override def accept(dir: File, name: String): Boolean = {
+              name.toLowerCase.startsWith("flink-sql-client") ||
+              name.toLowerCase.startsWith("flink-sql-gateway")
+            }
+          }).map(f => f.getAbsolutePath).sorted
+        flinkExtraJars ++= flinkSqlJars
+
+        val userJars = conf.get(ENGINE_FLINK_APPLICATION_JARS)
+        userJars.foreach(jars => flinkExtraJars ++= jars.split(","))
+
+        buffer += "-t"
+        buffer += "yarn-application"
+        buffer += s"-Dyarn.ship-files=${flinkExtraJars.mkString(";")}"
+        buffer += s"-Dyarn.tags=${conf.getOption(YARN_TAG_KEY).get}"
+        buffer += "-Dcontainerized.master.env.FLINK_CONF_DIR=."
+
+        val customFlinkConf = conf.getAllWithPrefix("flink", "")
+        customFlinkConf.foreach { case (k, v) =>
+          buffer += s"-D$k=$v"
         }
-      }).head.getAbsolutePath
-    classpathEntries.add(flinkSqlClientPath)
-
-    // jars from flink lib
-    classpathEntries.add(s"$flinkHome${File.separator}lib${File.separator}*")
-
-    // classpath contains flink configurations, default to flink.home/conf
-    classpathEntries.add(env.getOrElse("FLINK_CONF_DIR", 
s"$flinkHome${File.separator}conf"))
-    // classpath contains hadoop configurations
-    env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
-    env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
-    env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
-    val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH_KEY)
-    hadoopCp.foreach(classpathEntries.add)
-    val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
-    extraCp.foreach(classpathEntries.add)
-    if (hadoopCp.isEmpty && extraCp.isEmpty) {
-      warn(s"The conf of ${FLINK_HADOOP_CLASSPATH_KEY} and 
${ENGINE_FLINK_EXTRA_CLASSPATH.key}" +
-        s" is empty.")
-      debug("Detected development environment")
-      mainResource.foreach { path =>
-        val devHadoopJars = Paths.get(path).getParent
-          .resolve(s"scala-$SCALA_COMPILE_VERSION")
-          .resolve("jars")
-        if (!Files.exists(devHadoopJars)) {
-          throw new KyuubiException(s"The path $devHadoopJars does not exists. 
" +
-            s"Please set ${FLINK_HADOOP_CLASSPATH_KEY} or 
${ENGINE_FLINK_EXTRA_CLASSPATH.key} " +
-            s"for configuring location of hadoop client jars, etc")
+
+        buffer += "-c"
+        buffer += s"$mainClass"
+        buffer += s"${mainResource.get}"
+
+        buffer += "--conf"
+        buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
+        conf.getAll.foreach { case (k, v) =>
+          if (k.startsWith("kyuubi.")) {
+            buffer += "--conf"
+            buffer += s"$k=$v"
+          }
         }
-        classpathEntries.add(s"$devHadoopJars${File.separator}*")
-      }
-    }
-    buffer += classpathEntries.asScala.mkString(File.pathSeparator)
-    buffer += mainClass
 
-    buffer += "--conf"
-    buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
+        buffer.toArray
+
+      case _ =>
+        val buffer = new ArrayBuffer[String]()
+        buffer += executable
 
-    for ((k, v) <- conf.getAll) {
-      buffer += "--conf"
-      buffer += s"$k=$v"
+        val memory = conf.get(ENGINE_FLINK_MEMORY)
+        buffer += s"-Xmx$memory"
+        val javaOptions = conf.get(ENGINE_FLINK_JAVA_OPTIONS)
+        if (javaOptions.isDefined) {
+          buffer += javaOptions.get
+        }
+
+        buffer += "-cp"
+        val classpathEntries = new java.util.LinkedHashSet[String]
+        // flink engine runtime jar
+        mainResource.foreach(classpathEntries.add)
+        // flink sql client jar
+        val flinkSqlClientPath = Paths.get(flinkHome)
+          .resolve("opt")
+          .toFile
+          .listFiles(new FilenameFilter {
+            override def accept(dir: File, name: String): Boolean = {
+              name.toLowerCase.startsWith("flink-sql-client")
+            }
+          }).head.getAbsolutePath
+        classpathEntries.add(flinkSqlClientPath)
+
+        // jars from flink lib
+        
classpathEntries.add(s"$flinkHome${File.separator}lib${File.separator}*")
+
+        // classpath contains flink configurations, default to flink.home/conf
+        classpathEntries.add(env.getOrElse("FLINK_CONF_DIR", 
s"$flinkHome${File.separator}conf"))
+        // classpath contains hadoop configurations
+        env.get("HADOOP_CONF_DIR").foreach(classpathEntries.add)
+        env.get("YARN_CONF_DIR").foreach(classpathEntries.add)
+        env.get("HBASE_CONF_DIR").foreach(classpathEntries.add)
+        val hadoopCp = env.get(FLINK_HADOOP_CLASSPATH_KEY)
+        hadoopCp.foreach(classpathEntries.add)
+        val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
+        extraCp.foreach(classpathEntries.add)
+        if (hadoopCp.isEmpty && extraCp.isEmpty) {
+          warn(s"The conf of ${FLINK_HADOOP_CLASSPATH_KEY} and " +
+            s"${ENGINE_FLINK_EXTRA_CLASSPATH.key} is empty.")
+          debug("Detected development environment.")
+          mainResource.foreach { path =>
+            val devHadoopJars = Paths.get(path).getParent
+              .resolve(s"scala-$SCALA_COMPILE_VERSION")
+              .resolve("jars")
+            if (!Files.exists(devHadoopJars)) {
+              throw new KyuubiException(s"The path $devHadoopJars does not 
exists. " +
+                s"Please set ${FLINK_HADOOP_CLASSPATH_KEY} or 
${ENGINE_FLINK_EXTRA_CLASSPATH.key}" +
+                s" for configuring location of hadoop client jars, etc.")
+            }
+            classpathEntries.add(s"$devHadoopJars${File.separator}*")
+          }
+        }
+        buffer += classpathEntries.asScala.mkString(File.pathSeparator)
+        buffer += mainClass
+
+        buffer += "--conf"
+        buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
+
+        conf.getAll.foreach { case (k, v) =>
+          buffer += "--conf"
+          buffer += s"$k=$v"
+        }
+        buffer.toArray
     }
-    buffer.toArray
   }
 
   override def shortName: String = "flink"
 }
 
 object FlinkProcessBuilder {
-  final val APP_KEY = "yarn.application.name"
-  final val TAG_KEY = "yarn.tags"
+  final val FLINK_EXEC_FILE = "flink"
+  final val YARN_APP_KEY = "yarn.application.name"
+  final val YARN_TAG_KEY = "yarn.tags"
   final val FLINK_HADOOP_CLASSPATH_KEY = "FLINK_HADOOP_CLASSPATH"
   final val FLINK_PROXY_USER_KEY = "HADOOP_PROXY_USER"
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 7ee38d4ef..53450b589 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.kyuubi.engine.flink
 
 import java.io.File
+import java.nio.file.{Files, Paths}
 
 import scala.collection.JavaConverters._
 import scala.collection.immutable.ListMap
@@ -25,18 +26,36 @@ import scala.util.matching.Regex
 
 import org.apache.kyuubi.KyuubiFunSuite
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_EXTRA_CLASSPATH, 
ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY}
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, 
ENGINE_FLINK_EXTRA_CLASSPATH, ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY}
 import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
 
 class FlinkProcessBuilderSuite extends KyuubiFunSuite {
-  private def conf = KyuubiConf().set("kyuubi.on", "off")
+  private def sessionModeConf = KyuubiConf()
+    .set("flink.execution.target", "yarn-session")
+    .set("kyuubi.on", "off")
     .set(ENGINE_FLINK_MEMORY, "512m")
     .set(
       ENGINE_FLINK_JAVA_OPTIONS,
       "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")
 
+  private def applicationModeConf = KyuubiConf()
+    .set("flink.execution.target", "yarn-application")
+    .set(ENGINE_FLINK_APPLICATION_JARS, tempUdfJar.toString)
+    .set("kyuubi.on", "off")
+
+  private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile
+  private val tempOpt =
+    Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, 
"opt")).toFile
+  Files.createFile(Paths.get(tempOpt.toPath.toString, 
"flink-sql-client-1.16.1.jar"))
+  Files.createFile(Paths.get(tempOpt.toPath.toString, 
"flink-sql-gateway-1.16.1.jar"))
+  private val tempUsrLib =
+    Files.createDirectories(Paths.get(tempFlinkHome.toPath.toString, 
"usrlib")).toFile
+  private val tempUdfJar =
+    Files.createFile(Paths.get(tempUsrLib.toPath.toString, "test-udf.jar"))
+
   private def envDefault: ListMap[String, String] = ListMap(
-    "JAVA_HOME" -> s"${File.separator}jdk")
+    "JAVA_HOME" -> s"${File.separator}jdk",
+    "FLINK_HOME" -> s"${tempFlinkHome.toPath}")
   private def envWithoutHadoopCLASSPATH: ListMap[String, String] = envDefault +
     ("HADOOP_CONF_DIR" -> s"${File.separator}hadoop${File.separator}conf") +
     ("YARN_CONF_DIR" -> s"${File.separator}yarn${File.separator}conf") +
@@ -44,11 +63,12 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
   private def envWithAllHadoop: ListMap[String, String] = 
envWithoutHadoopCLASSPATH +
     (FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
   private def confStr: String = {
-    conf.clone.set("yarn.tags", "KYUUBI").getAll
+    sessionModeConf.clone.set("yarn.tags", "KYUUBI").getAll
       .map { case (k, v) => s"\\\\\\n\\t--conf $k=$v" }
       .mkString(" ")
   }
-  private def matchActualAndExpected(builder: FlinkProcessBuilder): Unit = {
+
+  private def matchActualAndExpectedSessionMode(builder: FlinkProcessBuilder): 
Unit = {
     val actualCommands = builder.toString
     val classpathStr = constructClasspathStr(builder)
     val expectedCommands =
@@ -59,6 +79,27 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
     assert(matcher.matches())
   }
 
+  private def matchActualAndExpectedApplicationMode(builder: 
FlinkProcessBuilder): Unit = {
+    val actualCommands = builder.toString
+    val expectedCommands =
+      escapePaths(s"${builder.flinkExecutable} run-application ") +
+        s"-t yarn-application " +
+        
s"-Dyarn.ship-files=.*\\/flink-sql-client.*jar;.*\\/flink-sql-gateway.*jar;$tempUdfJar
 " +
+        s"-Dyarn\\.tags=KYUUBI " +
+        s"-Dcontainerized\\.master\\.env\\.FLINK_CONF_DIR=\\. " +
+        s"-Dexecution.target=yarn-application " +
+        s"-c org\\.apache\\.kyuubi\\.engine\\.flink\\.FlinkSQLEngine " +
+        s".*kyuubi-flink-sql-engine_.*jar" +
+        s"(?: \\\\\\n\\t--conf \\S+=\\S+)+"
+    val regex = new Regex(expectedCommands)
+    val matcher = regex.pattern.matcher(actualCommands)
+    assert(matcher.matches())
+  }
+
+  private def escapePaths(path: String): String = {
+    path.replaceAll("/", "\\/")
+  }
+
   private def constructClasspathStr(builder: FlinkProcessBuilder) = {
     val classpathEntries = new java.util.LinkedHashSet[String]
     builder.mainResource.foreach(classpathEntries.add)
@@ -69,11 +110,11 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
     classpathEntries.add(s"$flinkHome$flinkConfPathSuffix")
     val envMap = builder.env
     envMap.foreach { case (k, v) =>
-      if (!k.equals("JAVA_HOME")) {
+      if (!k.equals("JAVA_HOME") && !k.equals("FLINK_HOME")) {
         classpathEntries.add(v)
       }
     }
-    val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
+    val extraCp = sessionModeConf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
     extraCp.foreach(classpathEntries.add)
     val classpathStr = classpathEntries.asScala.mkString(File.pathSeparator)
     classpathStr
@@ -86,18 +127,25 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
   private val flinkConfPathSuffix = s"${File.separator}conf"
   private val mainClassStr = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
 
-  test("all hadoop related environment variables are configured") {
-    val builder = new FlinkProcessBuilder("vinoyang", conf) {
+  test("session mode - all hadoop related environment variables are 
configured") {
+    val builder = new FlinkProcessBuilder("vinoyang", sessionModeConf) {
       override def env: Map[String, String] = envWithAllHadoop
     }
-    matchActualAndExpected(builder)
+    matchActualAndExpectedSessionMode(builder)
   }
 
-  test("only FLINK_HADOOP_CLASSPATH environment variables are configured") {
-    val builder = new FlinkProcessBuilder("vinoyang", conf) {
+  test("session mode - only FLINK_HADOOP_CLASSPATH environment variables are 
configured") {
+    val builder = new FlinkProcessBuilder("vinoyang", sessionModeConf) {
       override def env: Map[String, String] = envDefault +
         (FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
     }
-    matchActualAndExpected(builder)
+    matchActualAndExpectedSessionMode(builder)
+  }
+
+  test("application mode - default env") {
+    val builder = new FlinkProcessBuilder("paullam", applicationModeConf) {
+      override def env: Map[String, String] = envDefault
+    }
+    matchActualAndExpectedApplicationMode(builder)
   }
 }
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
index 1a73cc24c..68a175efc 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
@@ -34,7 +34,7 @@ import org.apache.kyuubi.service.AbstractService
 class MiniYarnService extends AbstractService("TestMiniYarnService") {
 
   private val hadoopConfDir: File = Utils.createTempDir().toFile
-  private val yarnConf: YarnConfiguration = {
+  private var yarnConf: YarnConfiguration = {
     val yarnConfig = new YarnConfiguration()
     // Disable the disk utilization check to avoid the test hanging when 
people's disks are
     // getting full.
@@ -71,6 +71,10 @@ class MiniYarnService extends 
AbstractService("TestMiniYarnService") {
   }
   private val yarnCluster: MiniYARNCluster = new MiniYARNCluster(getName, 1, 
1, 1)
 
+  def setYarnConf(yarnConf: YarnConfiguration): Unit = {
+    this.yarnConf = yarnConf
+  }
+
   override def initialize(conf: KyuubiConf): Unit = {
     yarnCluster.init(yarnConf)
     super.initialize(conf)


Reply via email to