This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 30845617fe1 [FLINK-36759][sql-gateway] Add REST API to deploy script 
in application mode (#25730)
30845617fe1 is described below

commit 30845617fe12f3532228a2bf5a1539dc7cba4b30
Author: Shengkai <33114724+fsk...@users.noreply.github.com>
AuthorDate: Sat Jan 4 23:17:40 2025 +0800

    [FLINK-36759][sql-gateway] Add REST API to deploy script in application 
mode (#25730)
---
 .../flink/client/cli/ApplicationDeployer.java      |   2 +-
 .../cli/ApplicationClusterDeployer.java            |   8 +-
 .../flink/client/program/TestingClusterClient.java |   2 +-
 flink-end-to-end-tests/run-nightly-tests.sh        |   1 +
 flink-end-to-end-tests/test-scripts/common.sh      |   6 +
 .../test_kubernetes_sql_application.sh             |  78 +++++
 .../flink-sql-client/src/test/resources/sql/set.q  | 137 ++++----
 .../flink/table/gateway/api/SqlGatewayService.java |  21 ++
 .../gateway/api/utils/MockedSqlGatewayService.java |  11 +
 .../table/gateway/rest/SqlGatewayRestEndpoint.java |  11 +
 .../handler/application/DeployScriptHandler.java   |  75 +++++
 .../header/application/DeployScriptHeaders.java    |  91 +++++
 .../application/DeployScriptRequestBody.java       |  71 ++++
 .../application/DeployScriptResponseBody.java      |  42 +++
 .../gateway/service/SqlGatewayServiceImpl.java     |  47 +++
 .../gateway/service/context/DefaultContext.java    |   2 +
 .../table/gateway/rest/DeployScriptITCase.java     | 371 +++++++++++++++++++++
 .../service/application/ScriptRunnerITCase.java    |   4 +-
 .../service/utils/SqlGatewayServiceExtension.java  |  13 +-
 ...he.flink.client.deployment.ClusterClientFactory |  16 +
 .../flink-sql-gateway/src/test/resources/sql/set.q |  50 +--
 .../resources/sql_gateway_rest_api_v4.snapshot     |  40 +++
 .../flink/table/runtime/application/SqlDriver.java |  16 +-
 23 files changed, 1009 insertions(+), 106 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ApplicationDeployer.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ApplicationDeployer.java
index f09faccf3fc..92de8afb1e0 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/ApplicationDeployer.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/ApplicationDeployer.java
@@ -34,7 +34,7 @@ public interface ApplicationDeployer {
      * @param applicationConfiguration an {@link ApplicationConfiguration} 
specific to the
      *     application to be executed.
      */
-    <ClusterID> void run(
+    <ClusterID> ClusterID run(
             final Configuration configuration,
             final ApplicationConfiguration applicationConfiguration)
             throws Exception;
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/cli/ApplicationClusterDeployer.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/cli/ApplicationClusterDeployer.java
index b62060f77b6..e59bb22505d 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/cli/ApplicationClusterDeployer.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/cli/ApplicationClusterDeployer.java
@@ -48,7 +48,7 @@ public class ApplicationClusterDeployer implements 
ApplicationDeployer {
         this.clientServiceLoader = checkNotNull(clientServiceLoader);
     }
 
-    public <ClusterID> void run(
+    public <ClusterID> ClusterID run(
             final Configuration configuration,
             final ApplicationConfiguration applicationConfiguration)
             throws Exception {
@@ -64,8 +64,10 @@ public class ApplicationClusterDeployer implements 
ApplicationDeployer {
             final ClusterSpecification clusterSpecification =
                     clientFactory.getClusterSpecification(configuration);
 
-            clusterDescriptor.deployApplicationCluster(
-                    clusterSpecification, applicationConfiguration);
+            return clusterDescriptor
+                    .deployApplicationCluster(clusterSpecification, 
applicationConfiguration)
+                    .getClusterClient()
+                    .getClusterId();
         }
     }
 }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
index 0fa803ea8b9..0e31c24af94 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
@@ -113,7 +113,7 @@ public class TestingClusterClient<T> implements 
ClusterClient<T> {
 
     @Override
     public T getClusterId() {
-        throw new UnsupportedOperationException();
+        return (T) "test-cluster";
     }
 
     @Override
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 963903c3257..dcd1fe633d8 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -181,6 +181,7 @@ function run_group_2 {
     run_test "Streaming SQL end-to-end test using planner loader" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions"
     run_test "Streaming SQL end-to-end test using planner with Scala version" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh scala-planner" 
"skip_check_exceptions"
     run_test "Sql Jdbc Driver end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_sql_jdbc_driver.sh" "skip_check_exceptions"
+    run_test "Run kubernetes SQL application test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_sql_application.sh"
 
     run_test "Streaming File Sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_file_sink.sh local StreamingFileSink" 
"skip_check_exceptions"
     run_test "Streaming File Sink s3 end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 StreamingFileSink" 
"skip_check_exceptions"
diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index fe24d0ef01a..ab13b7e57ed 100755
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -533,10 +533,12 @@ function check_logs_for_non_empty_out_files {
 
 function shutdown_all {
   stop_cluster
+  stop_sql_gateway
   # stop TMs which started by command: bin/taskmanager.sh start
   "$FLINK_DIR"/bin/taskmanager.sh stop-all
   tm_kill_all
   jm_kill_all
+  gw_kill_all
 }
 
 function stop_cluster {
@@ -683,6 +685,10 @@ function tm_kill_all {
   kill_all 'TaskManagerRunner|TaskManager'
 }
 
+function gw_kill_all {
+  kill_all 'SqlGateway'
+}
+
 # Kills all processes that match the given name.
 function kill_all {
   local pid=`jps | grep -E "${1}" | cut -d " " -f 1 || true`
diff --git 
a/flink-end-to-end-tests/test-scripts/test_kubernetes_sql_application.sh 
b/flink-end-to-end-tests/test-scripts/test_kubernetes_sql_application.sh
new file mode 100755
index 00000000000..e109c9ffc84
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_kubernetes_sql_application.sh
@@ -0,0 +1,78 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source "$(dirname "$0")"/common_kubernetes.sh
+
+CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
+CLUSTER_ROLE_BINDING="flink-role-binding-default"
+CLUSTER_ID="flink-native-k8s-sql-application-1"
+FLINK_IMAGE_NAME="test_kubernetes_application-1"
+LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"
+IMAGE_BUILD_RETRIES=3
+IMAGE_BUILD_BACKOFF=2
+
+function internal_cleanup {
+    kubectl delete deployment ${CLUSTER_ID}
+    kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
+}
+
+start_kubernetes
+
+if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image 
${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then
+       echo "ERROR: Could not build image. Aborting..."
+       exit 1
+fi
+
+kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit 
--serviceaccount=default:default --namespace=default
+
+mkdir -p "$LOCAL_LOGS_PATH"
+
+echo "[INFO] Start SQL Gateway"
+set_config_key "sql-gateway.endpoint.rest.address" "localhost"
+start_sql_gateway
+
+echo "[INFO] Submit SQL job in Application Mode"
+SESSION_HANDLE=`curl --silent --request POST http://localhost:8083/sessions | 
sed -n 's/.*"sessionHandle":\s*"\([^"]*\)".*/\1/p'`
+curl --location --request POST 
http://localhost:8083/sessions/${SESSION_HANDLE}/scripts \
+--header 'Content-Type: application/json' \
+--data-raw '{
+    "script": "CREATE TEMPORARY TABLE sink(a INT) WITH ( '\''connector'\'' = 
'\''blackhole'\''); INSERT INTO sink VALUES (1), (2), (3);",
+    "executionConfig": {
+        "execution.target": "kubernetes-application",
+        "kubernetes.cluster-id": "'${CLUSTER_ID}'",
+        "kubernetes.container.image.ref": "'${FLINK_IMAGE_NAME}'",
+        "jobmanager.memory.process.size": "1088m",
+        "taskmanager.memory.process.size": "1000m",
+        "kubernetes.jobmanager.cpu": 0.5,
+        "kubernetes.taskmanager.cpu": 0.5,
+        "kubernetes.rest-service.exposed.type": "NodePort"
+    }
+}'
+
+echo ""
+echo "[INFO] Wait job finishes"
+kubectl wait --for=condition=Available --timeout=60s deploy/${CLUSTER_ID} || 
exit 1
+jm_pod_name=$(kubectl get pods 
--selector="app=${CLUSTER_ID},component=jobmanager" -o 
jsonpath='{..metadata.name}')
+wait_rest_endpoint_up_k8s $jm_pod_name
+
+# The Flink cluster will be destroyed immediately once the job finished or 
failed. So we check jobmanager logs
+# instead of checking the result
+echo "[INFO] Check logs to verify job finishes"
+kubectl logs -f $jm_pod_name >$LOCAL_LOGS_PATH/jobmanager.log
+grep -E "Job [A-Za-z0-9]+ reached terminal state FINISHED" 
$LOCAL_LOGS_PATH/jobmanager.log
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q 
b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index 05f79218cb2..4189a9588a2 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -74,21 +74,22 @@ reset 'table.resources.download-dir';
 
 # list the configured configuration
 set;
-+-------------------------------------------------+-----------+
-|                                             key |     value |
-+-------------------------------------------------+-----------+
-|                              execution.attached |      true |
-|             execution.shutdown-on-attached-exit |     false |
-|             execution.state-recovery.claim-mode |  NO_CLAIM |
-| execution.state-recovery.ignore-unclaimed-state |     false |
-|                                execution.target |    remote |
-|                          jobmanager.rpc.address | 
$VAR_JOBMANAGER_RPC_ADDRESS |
-|                                       rest.port |     $VAR_REST_PORT |
-|              sql-client.display.print-time-cost |     false |
-|                sql-client.execution.result-mode |   tableau |
-|                table.exec.legacy-cast-behaviour |  DISABLED |
-+-------------------------------------------------+-----------+
-10 rows in set
++-------------------------------------------------+--------------+
+|                                             key |        value |
++-------------------------------------------------+--------------+
+|                 $internal.deployment.config-dir | /dummy/conf/ |
+|                              execution.attached |         true |
+|             execution.shutdown-on-attached-exit |        false |
+|             execution.state-recovery.claim-mode |     NO_CLAIM |
+| execution.state-recovery.ignore-unclaimed-state |        false |
+|                                execution.target |       remote |
+|                          jobmanager.rpc.address |    
$VAR_JOBMANAGER_RPC_ADDRESS |
+|                                       rest.port |        $VAR_REST_PORT |
+|              sql-client.display.print-time-cost |        false |
+|                sql-client.execution.result-mode |      tableau |
+|                table.exec.legacy-cast-behaviour |     DISABLED |
++-------------------------------------------------+--------------+
+11 rows in set
 !ok
 
 # reset the configuration
@@ -97,18 +98,19 @@ reset;
 !info
 
 set;
-+-------------------------------------------------+-----------+
-|                                             key |     value |
-+-------------------------------------------------+-----------+
-|                              execution.attached |      true |
-|             execution.shutdown-on-attached-exit |     false |
-|             execution.state-recovery.claim-mode |  NO_CLAIM |
-| execution.state-recovery.ignore-unclaimed-state |     false |
-|                                execution.target |    remote |
-|                          jobmanager.rpc.address | 
$VAR_JOBMANAGER_RPC_ADDRESS |
-|                                       rest.port |     $VAR_REST_PORT |
-+-------------------------------------------------+-----------+
-7 rows in set
++-------------------------------------------------+--------------+
+|                                             key |        value |
++-------------------------------------------------+--------------+
+|                 $internal.deployment.config-dir | /dummy/conf/ |
+|                              execution.attached |         true |
+|             execution.shutdown-on-attached-exit |        false |
+|             execution.state-recovery.claim-mode |     NO_CLAIM |
+| execution.state-recovery.ignore-unclaimed-state |        false |
+|                                execution.target |       remote |
+|                          jobmanager.rpc.address |    
$VAR_JOBMANAGER_RPC_ADDRESS |
+|                                       rest.port |        $VAR_REST_PORT |
++-------------------------------------------------+--------------+
+8 rows in set
 !ok
 
 # should fail because default dialect doesn't support hive dialect
@@ -136,19 +138,20 @@ set 'sql-client.verbose' = 'true';
 !info
 
 set;
-+-------------------------------------------------+-----------+
-|                                             key |     value |
-+-------------------------------------------------+-----------+
-|                              execution.attached |      true |
-|             execution.shutdown-on-attached-exit |     false |
-|             execution.state-recovery.claim-mode |  NO_CLAIM |
-| execution.state-recovery.ignore-unclaimed-state |     false |
-|                                execution.target |    remote |
-|                          jobmanager.rpc.address | 
$VAR_JOBMANAGER_RPC_ADDRESS |
-|                                       rest.port |     $VAR_REST_PORT |
-|                              sql-client.verbose |      true |
-+-------------------------------------------------+-----------+
-8 rows in set
++-------------------------------------------------+--------------+
+|                                             key |        value |
++-------------------------------------------------+--------------+
+|                 $internal.deployment.config-dir | /dummy/conf/ |
+|                              execution.attached |         true |
+|             execution.shutdown-on-attached-exit |        false |
+|             execution.state-recovery.claim-mode |     NO_CLAIM |
+| execution.state-recovery.ignore-unclaimed-state |        false |
+|                                execution.target |       remote |
+|                          jobmanager.rpc.address |    
$VAR_JOBMANAGER_RPC_ADDRESS |
+|                                       rest.port |        $VAR_REST_PORT |
+|                              sql-client.verbose |         true |
++-------------------------------------------------+--------------+
+9 rows in set
 !ok
 
 set 'execution.attached' = 'false';
@@ -160,19 +163,20 @@ reset 'execution.attached';
 !info
 
 set;
-+-------------------------------------------------+-----------+
-|                                             key |     value |
-+-------------------------------------------------+-----------+
-|                              execution.attached |      true |
-|             execution.shutdown-on-attached-exit |     false |
-|             execution.state-recovery.claim-mode |  NO_CLAIM |
-| execution.state-recovery.ignore-unclaimed-state |     false |
-|                                execution.target |    remote |
-|                          jobmanager.rpc.address | 
$VAR_JOBMANAGER_RPC_ADDRESS |
-|                                       rest.port |     $VAR_REST_PORT |
-|                              sql-client.verbose |      true |
-+-------------------------------------------------+-----------+
-8 rows in set
++-------------------------------------------------+--------------+
+|                                             key |        value |
++-------------------------------------------------+--------------+
+|                 $internal.deployment.config-dir | /dummy/conf/ |
+|                              execution.attached |         true |
+|             execution.shutdown-on-attached-exit |        false |
+|             execution.state-recovery.claim-mode |     NO_CLAIM |
+| execution.state-recovery.ignore-unclaimed-state |        false |
+|                                execution.target |       remote |
+|                          jobmanager.rpc.address |    
$VAR_JOBMANAGER_RPC_ADDRESS |
+|                                       rest.port |        $VAR_REST_PORT |
+|                              sql-client.verbose |         true |
++-------------------------------------------------+--------------+
+9 rows in set
 !ok
 
 # test reset can work with add jar
@@ -190,19 +194,20 @@ SHOW JARS;
 !ok
 
 set;
-+-------------------------------------------------+-----------+
-|                                             key |     value |
-+-------------------------------------------------+-----------+
-|                              execution.attached |      true |
-|             execution.shutdown-on-attached-exit |     false |
-|             execution.state-recovery.claim-mode |  NO_CLAIM |
-| execution.state-recovery.ignore-unclaimed-state |     false |
-|                                execution.target |    remote |
-|                          jobmanager.rpc.address | 
$VAR_JOBMANAGER_RPC_ADDRESS |
-|                                       rest.port |     $VAR_REST_PORT |
-|                              sql-client.verbose |      true |
-+-------------------------------------------------+-----------+
-8 rows in set
++-------------------------------------------------+--------------+
+|                                             key |        value |
++-------------------------------------------------+--------------+
+|                 $internal.deployment.config-dir | /dummy/conf/ |
+|                              execution.attached |         true |
+|             execution.shutdown-on-attached-exit |        false |
+|             execution.state-recovery.claim-mode |     NO_CLAIM |
+| execution.state-recovery.ignore-unclaimed-state |        false |
+|                                execution.target |       remote |
+|                          jobmanager.rpc.address |    
$VAR_JOBMANAGER_RPC_ADDRESS |
+|                                       rest.port |        $VAR_REST_PORT |
+|                              sql-client.verbose |         true |
++-------------------------------------------------+--------------+
+9 rows in set
 !ok
 
 reset;
diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index c19843a4d53..bb69fdc658d 100644
--- 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 
 import javax.annotation.Nullable;
 
+import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -347,4 +348,24 @@ public interface SqlGatewayService {
             Map<String, String> dynamicOptions,
             Map<String, String> staticPartitions,
             Map<String, String> executionConfig);
+
+    // 
-------------------------------------------------------------------------------------------
+    // Deploy Script
+    // 
-------------------------------------------------------------------------------------------
+
+    /**
+     * Deploy the script in application mode.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param scriptUri URI of the script.
+     * @param script the content of the script.
+     * @param executionConfig to run the script.
+     * @return the cluster description.
+     */
+    <ClusterID> ClusterID deployScript(
+            SessionHandle sessionHandle,
+            @Nullable URI scriptUri,
+            @Nullable String script,
+            Configuration executionConfig)
+            throws SqlGatewayException;
 }
diff --git 
a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
 
b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
index 348cf80df16..d7b45779629 100644
--- 
a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
+++ 
b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.table.gateway.api.session.SessionHandle;
 
 import javax.annotation.Nullable;
 
+import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -201,6 +202,16 @@ public class MockedSqlGatewayService implements 
SqlGatewayService {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public <ClusterID> ClusterID deployScript(
+            SessionHandle sessionHandle,
+            @Nullable URI scriptUri,
+            @Nullable String script,
+            Configuration executionConfig)
+            throws SqlGatewayException {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public ResolvedCatalogBaseTable<?> getTable(
             SessionHandle sessionHandle, ObjectIdentifier tableIdentifier)
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
index 2fa462ade85..e48df09a64d 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint;
+import 
org.apache.flink.table.gateway.rest.handler.application.DeployScriptHandler;
 import 
org.apache.flink.table.gateway.rest.handler.materializedtable.RefreshMaterializedTableHandler;
 import 
org.apache.flink.table.gateway.rest.handler.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowHandler;
 import 
org.apache.flink.table.gateway.rest.handler.materializedtable.scheduler.DeleteEmbeddedSchedulerWorkflowHandler;
@@ -43,6 +44,7 @@ import 
org.apache.flink.table.gateway.rest.handler.statement.ExecuteStatementHan
 import 
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler;
 import org.apache.flink.table.gateway.rest.handler.util.GetApiVersionHandler;
 import org.apache.flink.table.gateway.rest.handler.util.GetInfoHandler;
+import 
org.apache.flink.table.gateway.rest.header.application.DeployScriptHeaders;
 import 
org.apache.flink.table.gateway.rest.header.materializedtable.RefreshMaterializedTableHeaders;
 import 
org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowHeaders;
 import 
org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.DeleteEmbeddedSchedulerWorkflowHeaders;
@@ -99,6 +101,7 @@ public class SqlGatewayRestEndpoint extends 
RestServerEndpoint implements SqlGat
         addStatementRelatedHandlers(handlers);
         addEmbeddedSchedulerRelatedHandlers(handlers);
         addMaterializedTableRelatedHandlers(handlers);
+        addDeployScriptRelatedHandlers(handlers);
         return handlers;
     }
 
@@ -257,6 +260,14 @@ public class SqlGatewayRestEndpoint extends 
RestServerEndpoint implements SqlGat
                         refreshMaterializedTableHandler));
     }
 
+    private void addDeployScriptRelatedHandlers(
+            List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> 
handlers) {
+        DeployScriptHandler handler =
+                new DeployScriptHandler(
+                        service, responseHeaders, 
DeployScriptHeaders.getInstance());
+        handlers.add(Tuple2.of(DeployScriptHeaders.getInstance(), handler));
+    }
+
     @Override
     protected void startInternal() {
         quartzScheduler.start();
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/application/DeployScriptHandler.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/application/DeployScriptHandler.java
new file mode 100644
index 00000000000..120fb278750
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/application/DeployScriptHandler.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.handler.application;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
+import 
org.apache.flink.table.gateway.rest.message.application.DeployScriptRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.application.DeployScriptResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler to deploy the script in application mode. */
+public class DeployScriptHandler
+        extends AbstractSqlGatewayRestHandler<
+                DeployScriptRequestBody, DeployScriptResponseBody, 
SessionMessageParameters> {
+
+    public DeployScriptHandler(
+            SqlGatewayService service,
+            Map<String, String> responseHeaders,
+            MessageHeaders<
+                            DeployScriptRequestBody,
+                            DeployScriptResponseBody,
+                            SessionMessageParameters>
+                    messageHeaders) {
+        super(service, responseHeaders, messageHeaders);
+    }
+
+    @Override
+    protected CompletableFuture<DeployScriptResponseBody> handleRequest(
+            @Nullable SqlGatewayRestAPIVersion version,
+            @Nonnull HandlerRequest<DeployScriptRequestBody> request)
+            throws RestHandlerException {
+        return CompletableFuture.completedFuture(
+                new DeployScriptResponseBody(
+                        service.deployScript(
+                                        request.getPathParameter(
+                                                
SessionHandleIdPathParameter.class),
+                                        
request.getRequestBody().getScriptUri() == null
+                                                ? null
+                                                : URI.create(
+                                                        
request.getRequestBody().getScriptUri()),
+                                        request.getRequestBody().getScript(),
+                                        Configuration.fromMap(
+                                                
request.getRequestBody().getExecutionConfig()))
+                                .toString()));
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/application/DeployScriptHeaders.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/application/DeployScriptHeaders.java
new file mode 100644
index 00000000000..54c1ea5f310
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/application/DeployScriptHeaders.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.header.application;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders;
+import 
org.apache.flink.table.gateway.rest.message.application.DeployScriptRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.application.DeployScriptResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.Collection;
+
+/** Message headers for run the script in application mode. */
+public class DeployScriptHeaders
+        implements SqlGatewayMessageHeaders<
+                DeployScriptRequestBody, DeployScriptResponseBody, 
SessionMessageParameters> {
+
+    private static final DeployScriptHeaders INSTANCE = new 
DeployScriptHeaders();
+
+    private static final String URL = "/sessions/:" + 
SessionHandleIdPathParameter.KEY + "/scripts";
+
+    public static DeployScriptHeaders getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public Class<DeployScriptResponseBody> getResponseClass() {
+        return DeployScriptResponseBody.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.OK;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Deploy the script in application mode";
+    }
+
+    @Override
+    public Class<DeployScriptRequestBody> getRequestClass() {
+        return DeployScriptRequestBody.class;
+    }
+
+    @Override
+    public SessionMessageParameters getUnresolvedMessageParameters() {
+        return new SessionMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.POST;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+
+    @Override
+    public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
+        return 
SqlGatewayRestAPIVersion.getHigherVersions(SqlGatewayRestAPIVersion.V3);
+    }
+
+    @Override
+    public String operationId() {
+        return "deployScript";
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptRequestBody.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptRequestBody.java
new file mode 100644
index 00000000000..bc30a5b3e10
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptRequestBody.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.application;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.Map;
+
+/** Request to deploy a script in application mode. */
+public class DeployScriptRequestBody implements RequestBody {
+
+    private static final String FIELD_NAME_SCRIPT = "script";
+    private static final String FIELD_NAME_SCRIPT_URI = "scriptUri";
+    private static final String FIELD_NAME_EXECUTION_CONFIG = 
"executionConfig";
+
+    @JsonProperty(FIELD_NAME_SCRIPT)
+    private final @Nullable String script;
+
+    @JsonProperty(FIELD_NAME_SCRIPT_URI)
+    private final @Nullable String scriptUri;
+
+    @JsonProperty(FIELD_NAME_EXECUTION_CONFIG)
+    private final Map<String, String> executionConfig;
+
+    @JsonCreator
+    public DeployScriptRequestBody(
+            @JsonProperty(FIELD_NAME_SCRIPT) @Nullable String script,
+            @JsonProperty(FIELD_NAME_SCRIPT_URI) @Nullable String scriptUri,
+            @JsonProperty(FIELD_NAME_EXECUTION_CONFIG) @Nullable
+                    Map<String, String> executionConfig) {
+        this.script = script;
+        this.scriptUri = scriptUri;
+        this.executionConfig = executionConfig == null ? 
Collections.emptyMap() : executionConfig;
+    }
+
+    public Map<String, String> getExecutionConfig() {
+        return executionConfig;
+    }
+
+    @Nullable
+    public String getScript() {
+        return script;
+    }
+
+    @Nullable
+    public String getScriptUri() {
+        return scriptUri;
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptResponseBody.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptResponseBody.java
new file mode 100644
index 00000000000..3f0126eae25
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptResponseBody.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.application;
+
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Response about the cluster that runs the application. */
+public class DeployScriptResponseBody implements ResponseBody {
+
+    private static final String FIELD_NAME_CLUSTER_ID = "clusterID";
+
+    @JsonProperty(FIELD_NAME_CLUSTER_ID)
+    private final String clusterID;
+
+    @JsonCreator
+    public DeployScriptResponseBody(@JsonProperty(FIELD_NAME_CLUSTER_ID) 
String clusterID) {
+        this.clusterID = clusterID;
+    }
+
+    public String getClusterID() {
+        return clusterID;
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index e88f0212362..371d8aa8993 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -19,6 +19,9 @@
 package org.apache.flink.table.gateway.service;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import 
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.catalog.CatalogBaseTable.TableKind;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -42,12 +45,16 @@ import 
org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.service.operation.OperationManager;
 import org.apache.flink.table.gateway.service.session.Session;
 import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.table.runtime.application.SqlDriver;
+import org.apache.flink.util.StringUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.net.URI;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -330,6 +337,46 @@ public class SqlGatewayServiceImpl implements 
SqlGatewayService {
         }
     }
 
+    @Override
+    public <ClusterID> ClusterID deployScript(
+            SessionHandle sessionHandle,
+            @Nullable URI scriptUri,
+            @Nullable String script,
+            Configuration executionConfig)
+            throws SqlGatewayException {
+        Session session = sessionManager.getSession(sessionHandle);
+        if (scriptUri == null && script == null) {
+            throw new IllegalArgumentException("Please specify script content 
or uri.");
+        }
+        if (scriptUri != null && !StringUtils.isNullOrWhitespaceOnly(script)) {
+            throw new IllegalArgumentException(
+                    "Please specify either the script uri or the script 
content, but not both.");
+        }
+        Configuration mergedConfig = 
Configuration.fromMap(session.getSessionConfig());
+        mergedConfig.addAll(executionConfig);
+
+        List<String> arguments = new ArrayList<>();
+        if (scriptUri != null) {
+            arguments.add("--" + SqlDriver.OPTION_SQL_FILE.getLongOpt());
+            arguments.add(scriptUri.toString());
+        }
+        if (script != null) {
+            arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
+            arguments.add(script);
+        }
+
+        ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        arguments.toArray(new String[0]), 
SqlDriver.class.getName());
+        try {
+            return new ApplicationClusterDeployer(new 
DefaultClusterClientServiceLoader())
+                    .run(mergedConfig, applicationConfiguration);
+        } catch (Throwable t) {
+            LOG.error("Failed to deploy script to cluster.", t);
+            throw new SqlGatewayException("Failed to deploy script to 
cluster.", t);
+        }
+    }
+
     @Override
     public Set<FunctionInfo> listUserDefinedFunctions(
             SessionHandle sessionHandle, String catalogName, String 
databaseName)
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
index eae416cd0a6..78eccc7337d 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java
@@ -25,6 +25,7 @@ import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.client.cli.ProgramOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
@@ -141,6 +142,7 @@ public class DefaultContext {
 
         // 2. load the global configuration
         Configuration configuration = 
GlobalConfiguration.loadConfiguration(flinkConfigDir);
+        configuration.set(DeploymentOptionsInternal.CONF_DIR, flinkConfigDir);
         configuration.addAll(dynamicConfig);
 
         // 3. load the custom command lines
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/DeployScriptITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/DeployScriptITCase.java
new file mode 100644
index 00000000000..3ed3c80c169
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/DeployScriptITCase.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.ClusterClientProvider;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.CheckpointType;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.streaming.api.graph.ExecutionPlan;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import 
org.apache.flink.table.gateway.rest.header.application.DeployScriptHeaders;
+import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
+import 
org.apache.flink.table.gateway.rest.message.application.DeployScriptRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
+import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.rest.util.TestingRestClient;
+import org.apache.flink.table.gateway.service.utils.MockHttpServer;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.table.runtime.application.SqlDriver;
+import org.apache.flink.util.FileUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.net.URL;
+import java.nio.file.Path;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase to test deploy the script into application mode. */
+public class DeployScriptITCase {
+
+    @RegisterExtension
+    @Order(1)
+    public static final SqlGatewayServiceExtension 
SQL_GATEWAY_SERVICE_EXTENSION =
+            new SqlGatewayServiceExtension(Configuration::new);
+
+    @RegisterExtension
+    @Order(2)
+    private static final SqlGatewayRestEndpointExtension 
SQL_GATEWAY_REST_ENDPOINT_EXTENSION =
+            new 
SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
+
+    private static TestingRestClient restClient;
+    private static SessionHandle sessionHandle;
+    private static final String script =
+            "CREATE TEMPORARY TABLE sink(\n"
+                    + "  a INT\n"
+                    + ") WITH (\n"
+                    + "  'connector' = 'blackhole'\n"
+                    + ");\n"
+                    + "INSERT INTO sink VALUES (1);";
+
+    @BeforeAll
+    static void beforeAll() throws Exception {
+        restClient = TestingRestClient.getTestingRestClient();
+        sessionHandle =
+                new SessionHandle(
+                        UUID.fromString(
+                                restClient
+                                        .sendRequest(
+                                                
SQL_GATEWAY_REST_ENDPOINT_EXTENSION
+                                                        .getTargetAddress(),
+                                                
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(),
+                                                
OpenSessionHeaders.getInstance(),
+                                                
EmptyMessageParameters.getInstance(),
+                                                new OpenSessionRequestBody(
+                                                        "test",
+                                                        
Collections.singletonMap("key", "value")))
+                                        .get()
+                                        .getSessionHandle()));
+    }
+
+    @Test
+    void testDeployScriptToYarnCluster(@TempDir Path workDir) throws Exception 
{
+        verifyDeployScriptToCluster("yarn-application", script, null, script);
+        try (MockHttpServer server = MockHttpServer.startHttpServer()) {
+            File file = workDir.resolve("script.sql").toFile();
+            assertThat(file.createNewFile()).isTrue();
+            FileUtils.writeFileUtf8(file, script);
+            URL url = server.prepareResource("/download/script.sql", file);
+            verifyDeployScriptToCluster("yarn-application", null, 
url.toString(), script);
+        }
+    }
+
+    @Test
+    void testDeployScriptToKubernetesCluster(@TempDir Path workDir) throws 
Exception {
+        verifyDeployScriptToCluster("kubernetes-application", script, null, 
script);
+        try (MockHttpServer server = MockHttpServer.startHttpServer()) {
+            File file = workDir.resolve("script.sql").toFile();
+            assertThat(file.createNewFile()).isTrue();
+            FileUtils.writeFileUtf8(file, script);
+            URL url = server.prepareResource("/download/script.sql", file);
+            verifyDeployScriptToCluster("kubernetes-application", null, 
url.toString(), script);
+        }
+    }
+
+    private void verifyDeployScriptToCluster(
+            String target, @Nullable String script, @Nullable String 
scriptUri, String content)
+            throws Exception {
+        TestApplicationClusterClientFactory.id = target;
+
+        assertThat(
+                        restClient
+                                .sendRequest(
+                                        
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
+                                        
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(),
+                                        DeployScriptHeaders.getInstance(),
+                                        new 
SessionMessageParameters(sessionHandle),
+                                        new DeployScriptRequestBody(
+                                                script,
+                                                scriptUri,
+                                                Collections.singletonMap(
+                                                        
DeploymentOptions.TARGET.key(), target)))
+                                .get()
+                                .getClusterID())
+                .isEqualTo("test");
+        ApplicationConfiguration config = 
TestApplicationClusterDescriptor.applicationConfiguration;
+        
assertThat(TestApplicationClusterClientFactory.configuration.getString("key", 
"none"))
+                .isEqualTo("value");
+        
assertThat(config.getApplicationClassName()).isEqualTo(SqlDriver.class.getName());
+        
assertThat(SqlDriver.parseOptions(config.getProgramArguments())).isEqualTo(content);
+    }
+
+    /**
+     * Test {@link ClusterClientFactory} to capture {@link Configuration} and 
{@link
+     * ApplicationConfiguration}.
+     */
+    @SuppressWarnings({"unchecked", "rawTypes"})
+    public static class TestApplicationClusterClientFactory<ClusterID>
+            implements ClusterClientFactory {
+
+        public static String id;
+
+        private static volatile Configuration configuration;
+
+        @Override
+        public boolean isCompatibleWith(Configuration configuration) {
+            return Objects.equals(id, 
configuration.get(DeploymentOptions.TARGET));
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public ClusterDescriptor<ClusterID> 
createClusterDescriptor(Configuration configuration) {
+            TestApplicationClusterClientFactory.configuration = configuration;
+            return TestApplicationClusterDescriptor.INSTANCE;
+        }
+
+        @Override
+        @Nullable
+        public String getClusterId(Configuration configuration) {
+            return "test-application";
+        }
+
+        @Override
+        public ClusterSpecification getClusterSpecification(Configuration 
configuration) {
+            return new ClusterSpecification.ClusterSpecificationBuilder()
+                    .createClusterSpecification();
+        }
+    }
+
+    private static class TestApplicationClusterDescriptor<T> implements 
ClusterDescriptor<T> {
+
+        @SuppressWarnings("rawTypes")
+        private static final TestApplicationClusterDescriptor INSTANCE =
+                new TestApplicationClusterDescriptor<>();
+
+        static volatile ApplicationConfiguration applicationConfiguration;
+
+        private TestApplicationClusterDescriptor() {}
+
+        @Override
+        public String getClusterDescription() {
+            return "Test Application Cluster Descriptor";
+        }
+
+        @Override
+        public ClusterClientProvider<T> retrieve(T clusterId) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ClusterClientProvider<T> deploySessionCluster(
+                ClusterSpecification clusterSpecification) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public ClusterClientProvider<T> deployApplicationCluster(
+                final ClusterSpecification clusterSpecification,
+                final ApplicationConfiguration applicationConfiguration) {
+            TestApplicationClusterDescriptor.applicationConfiguration = 
applicationConfiguration;
+            return new ClusterClientProvider<T>() {
+                @Override
+                public ClusterClient<T> getClusterClient() {
+                    return new TestClusterClient();
+                }
+            };
+        }
+
+        @Override
+        public ClusterClientProvider<T> deployJobCluster(
+                ClusterSpecification clusterSpecification, JobGraph jobGraph, 
boolean detached) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void killCluster(T clusterId) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void close() {
+            // nothing to do
+        }
+    }
+
+    @SuppressWarnings("rawTypes")
+    private static class TestClusterClient implements ClusterClient {
+
+        @Override
+        public void close() {}
+
+        @Override
+        public Object getClusterId() {
+            return "test";
+        }
+
+        @Override
+        public Configuration getFlinkConfiguration() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void shutDownCluster() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public String getWebInterfaceURL() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<JobID> submitJob(ExecutionPlan executionPlan) 
{
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<JobStatus> getJobStatus(JobID jobId) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<JobResult> requestJobResult(JobID jobId) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Map<String, Object>> getAccumulators(
+                JobID jobID, ClassLoader loader) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Acknowledge> cancel(JobID jobId) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<String> cancelWithSavepoint(
+                JobID jobId, @Nullable String savepointDirectory, 
SavepointFormatType formatType) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<String> stopWithSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @Nullable String savepointDirectory,
+                SavepointFormatType formatType) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<String> stopWithDetachedSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @Nullable String savepointDirectory,
+                SavepointFormatType formatType) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<String> triggerSavepoint(
+                JobID jobId, @Nullable String savepointDirectory, 
SavepointFormatType formatType) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Long> triggerCheckpoint(
+                JobID jobId, CheckpointType checkpointType) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<String> triggerDetachedSavepoint(
+                JobID jobId, @Nullable String savepointDirectory, 
SavepointFormatType formatType) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<CoordinationResponse> sendCoordinationRequest(
+                JobID jobId, String operatorUid, CoordinationRequest request) {
+            throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptRunnerITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptRunnerITCase.java
index 5ddfeed5062..4eb91e0a44b 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptRunnerITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptRunnerITCase.java
@@ -141,7 +141,7 @@ class ScriptRunnerITCase {
                         udfJar.getAbsolutePath(), GENERATED_LOWER_UDF_CLASS);
 
         List<String> arguments =
-                Arrays.asList("--scriptPath", createStatementFile(workDir, 
script).toString());
+                Arrays.asList("--scriptUri", createStatementFile(workDir, 
script).toString());
         runScriptInCluster(arguments);
 
         assertThat(TestValuesTableFactory.getResultsAsStrings("sink"))
@@ -166,7 +166,7 @@ class ScriptRunnerITCase {
 
         try (MockHttpServer server = MockHttpServer.startHttpServer()) {
             URL url = server.prepareResource("/download/script.sql", file);
-            List<String> arguments = Arrays.asList("--scriptPath", 
url.toString());
+            List<String> arguments = Arrays.asList("--scriptUri", 
url.toString());
             runScriptInCluster(arguments);
         }
     }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
index 47b16a0baf0..9b0690b70e9 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.gateway.service.utils;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptionsInternal;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
 import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
@@ -35,6 +36,7 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -89,7 +91,12 @@ public class SqlGatewayServiceExtension implements 
BeforeAllCallback, AfterAllCa
             sessionManager =
                     sessionManagerCreator.apply(
                             DefaultContext.load(
-                                    new Configuration(), 
Collections.emptyList(), true));
+                                    Configuration.fromMap(
+                                            Collections.singletonMap(
+                                                    
DeploymentOptionsInternal.CONF_DIR.key(),
+                                                    "/dummy/conf/")),
+                                    Collections.emptyList(),
+                                    true));
         } finally {
             CommonTestUtils.setEnv(originalEnv);
         }
@@ -114,6 +121,10 @@ public class SqlGatewayServiceExtension implements 
BeforeAllCallback, AfterAllCa
         return sessionManager;
     }
 
+    public String getConfDir() {
+        return Paths.get(temporaryFolder.getRoot().getAbsolutePath(), 
"conf").toString();
+    }
+
     private String getFlinkConfContent(Map<String, String> flinkConf) {
         StringBuilder sb = new StringBuilder();
         flinkConf.forEach((k, v) -> sb.append(k).append(": 
").append(v).append("\n"));
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
 
b/flink-table/flink-sql-gateway/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
new file mode 100644
index 00000000000..9ea49fe13be
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.gateway.rest.DeployScriptITCase$TestApplicationClusterClientFactory
\ No newline at end of file
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/set.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/set.q
index f96ba188492..902e58e3bbb 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/set.q
@@ -27,18 +27,19 @@ reset table.resources.download-dir;
 
 set;
 !output
-+-------------------------------------------------+-----------+
-|                                             key |     value |
-+-------------------------------------------------+-----------+
-|                              execution.attached |      true |
-|             execution.shutdown-on-attached-exit |     false |
-|             execution.state-recovery.claim-mode |  NO_CLAIM |
-| execution.state-recovery.ignore-unclaimed-state |     false |
-|                                execution.target |    remote |
-|                          jobmanager.rpc.address | localhost |
-|                                       rest.port |     $VAR_REST_PORT |
-+-------------------------------------------------+-----------+
-7 rows in set
++-------------------------------------------------+--------------+
+|                                             key |        value |
++-------------------------------------------------+--------------+
+|                 $internal.deployment.config-dir | /dummy/conf/ |
+|                              execution.attached |         true |
+|             execution.shutdown-on-attached-exit |        false |
+|             execution.state-recovery.claim-mode |     NO_CLAIM |
+| execution.state-recovery.ignore-unclaimed-state |        false |
+|                                execution.target |       remote |
+|                          jobmanager.rpc.address |    localhost |
+|                                       rest.port |        $VAR_REST_PORT |
++-------------------------------------------------+--------------+
+8 rows in set
 !ok
 
 # set illegal value
@@ -49,16 +50,17 @@ java.lang.IllegalArgumentException: No enum constant 
org.apache.flink.table.api.
 
 set;
 !output
-+-------------------------------------------------+-----------+
-|                                             key |     value |
-+-------------------------------------------------+-----------+
-|                              execution.attached |      true |
-|             execution.shutdown-on-attached-exit |     false |
-|             execution.state-recovery.claim-mode |  NO_CLAIM |
-| execution.state-recovery.ignore-unclaimed-state |     false |
-|                                execution.target |    remote |
-|                          jobmanager.rpc.address | localhost |
-|                                       rest.port |     $VAR_REST_PORT |
-+-------------------------------------------------+-----------+
-7 rows in set
++-------------------------------------------------+--------------+
+|                                             key |        value |
++-------------------------------------------------+--------------+
+|                 $internal.deployment.config-dir | /dummy/conf/ |
+|                              execution.attached |         true |
+|             execution.shutdown-on-attached-exit |        false |
+|             execution.state-recovery.claim-mode |     NO_CLAIM |
+| execution.state-recovery.ignore-unclaimed-state |        false |
+|                                execution.target |       remote |
+|                          jobmanager.rpc.address |    localhost |
+|                                       rest.port |        $VAR_REST_PORT |
++-------------------------------------------------+--------------+
+8 rows in set
 !ok
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v4.snapshot
 
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v4.snapshot
index f4d14c6089f..8da82fac080 100644
--- 
a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v4.snapshot
+++ 
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v4.snapshot
@@ -341,6 +341,46 @@
         }
       }
     }
+  }, {
+    "url" : "/sessions/:session_handle/scripts",
+    "method" : "POST",
+    "status-code" : "200 OK",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "session_handle"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:application:DeployScriptRequestBody",
+      "properties" : {
+        "script" : {
+          "type" : "string"
+        },
+        "scriptUri" : {
+          "type" : "string"
+        },
+        "executionConfig" : {
+          "type" : "object",
+          "additionalProperties" : {
+            "type" : "string"
+          }
+        }
+      }
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:application:DeployScriptResponseBody",
+      "properties" : {
+        "clusterID" : {
+          "type" : "string"
+        }
+      }
+    }
   }, {
     "url" : "/sessions/:session_handle/statements",
     "method" : "POST",
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/application/SqlDriver.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/application/SqlDriver.java
index 2e01fbcad81..5c22f6b9d73 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/application/SqlDriver.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/application/SqlDriver.java
@@ -62,12 +62,12 @@ public class SqlDriver {
 
     public static final Option OPTION_SQL_FILE =
             Option.builder()
-                    .longOpt("scriptPath")
+                    .longOpt("scriptUri")
                     .numberOfArgs(1)
-                    .desc("SQL script file path. It supports to fetch files 
from the DFS or HTTP.")
+                    .desc("SQL script file URI. It supports to fetch files 
from the DFS or HTTP.")
                     .build();
 
-    public static final Option OPTION_SQL_STATEMENTS =
+    public static final Option OPTION_SQL_SCRIPT =
             Option.builder().longOpt("script").numberOfArgs(1).desc("Script 
content.").build();
 
     private static final String RUNNER_CLASS_NAME =
@@ -146,7 +146,7 @@ public class SqlDriver {
     static Options getSqlDriverOptions() {
         Options options = new Options();
         options.addOption(OPTION_SQL_FILE);
-        options.addOption(OPTION_SQL_STATEMENTS);
+        options.addOption(OPTION_SQL_SCRIPT);
         return options;
     }
 
@@ -161,12 +161,12 @@ public class SqlDriver {
             String content = 
getContent(line.getOptionValue(OPTION_SQL_FILE.getLongOpt()));
             if (content == null) {
                 return Preconditions.checkNotNull(
-                        
line.getOptionValue(OPTION_SQL_STATEMENTS.getLongOpt()),
-                        "Please use \"--script\" or \"--scriptPath\" to 
specify script either.");
+                        line.getOptionValue(OPTION_SQL_SCRIPT.getLongOpt()),
+                        "Please use \"--script\" or \"--scriptUri\" to specify 
script either.");
             } else {
                 Preconditions.checkArgument(
-                        
line.getOptionValue(OPTION_SQL_STATEMENTS.getLongOpt()) == null,
-                        "Don't set \"--script\" or \"--scriptPath\" 
together.");
+                        line.getOptionValue(OPTION_SQL_SCRIPT.getLongOpt()) == 
null,
+                        "Don't set \"--script\" or \"--scriptUri\" together.");
                 return content;
             }
         } catch (ParseException | URISyntaxException e) {

Reply via email to