This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4a2535cb8ab9f4ef807e98e8b92a6f921960c059
Author: Feng Jin <jinfeng1...@gmail.com>
AuthorDate: Sun Jan 19 21:10:30 2025 +0800

    [FLINK-37133][table] Support Submitting Refresh Job of Materialized Table 
to Yarn/K8s
---
 .../client/deployment/StandaloneClusterId.java     |   5 +
 flink-end-to-end-tests/run-nightly-tests.sh        |   1 +
 .../test_kubernetes_materialized_table.sh          | 241 +++++++++++++++++++++
 .../MaterializedTableManager.java                  | 195 ++++++++++++++---
 .../service/operation/OperationExecutor.java       |   8 +
 .../catalog/CatalogBaseTableResolutionTest.java    |   3 +-
 .../table/refresh/ContinuousRefreshHandler.java    |  21 +-
 7 files changed, 433 insertions(+), 41 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterId.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterId.java
index 6ebfc7201b99..402dc98fba40 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterId.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterId.java
@@ -27,4 +27,9 @@ public class StandaloneClusterId {
     public static StandaloneClusterId getInstance() {
         return INSTANCE;
     }
+
+    @Override
+    public String toString() {
+        return "StandaloneClusterId";
+    }
 }
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 1eb7d49405d4..92304ef616a0 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -180,6 +180,7 @@ function run_group_2 {
     run_test "Streaming SQL end-to-end test using planner with Scala version" 
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh scala-planner" 
"skip_check_exceptions"
     run_test "Sql Jdbc Driver end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_sql_jdbc_driver.sh" "skip_check_exceptions"
     run_test "Run kubernetes SQL application test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_sql_application.sh"
+    run_test "Run kubernetes Materialized Table test" 
"$END_TO_END_DIR/test-scripts/test_kubernetes_materialized_table.sh"
 
     run_test "Streaming File Sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_file_sink.sh local StreamingFileSink" 
"skip_check_exceptions"
     run_test "Streaming File Sink s3 end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 StreamingFileSink" 
"skip_check_exceptions"
diff --git 
a/flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh 
b/flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh
new file mode 100755
index 000000000000..addc04f28bda
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh
@@ -0,0 +1,241 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script aims to test Materialized Table and Kubernetes integration.
+
+source "$(dirname "$0")"/common_kubernetes.sh
+
+CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
+CLUSTER_ROLE_BINDING="flink-role-binding-default"
+APPLICATION_CLUSTER_ID="flink-native-k8s-sql-mt-application-1"
+FLINK_IMAGE_NAME="test_kubernetes_mt_application-1"
+LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"
+IMAGE_BUILD_RETRIES=3
+IMAGE_BUILD_BACKOFF=2
+
+# copy test-filesystem jar & hadoop plugin
+TEST_FILE_SYSTEM_JAR=`ls 
${END_TO_END_DIR}/../flink-test-utils-parent/flink-table-filesystem-test-utils/target/flink-table-filesystem-test-utils-*.jar`
+cp $TEST_FILE_SYSTEM_JAR ${FLINK_DIR}/lib/
+add_optional_plugin "s3-fs-hadoop"
+
+# start kubernetes
+start_kubernetes
+kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit 
--serviceaccount=default:default --namespace=default
+
+ # build image
+if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image 
${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then
+       echo "ERROR: Could not build image. Aborting..."
+       exit 1
+fi
+
+# setup materialized table data dir
+echo "[INFO] Start S3 env"
+source "$(dirname "$0")"/common_s3_minio.sh
+s3_setup hadoop
+S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/"
+MATERIALIZED_TABLE_DATA_DIR="${S3_TEST_DATA_WORDS_URI}/test_materialized_table-$(uuidgen)"
+
+echo "[INFO] Start SQL Gateway"
+set_config_key "sql-gateway.endpoint.rest.address" "localhost"
+start_sql_gateway
+
+SQL_GATEWAY_REST_PORT=8083
+
+function internal_cleanup {
+    kubectl delete deployment ${APPLICATION_CLUSTER_ID}
+    kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
+}
+
+function open_session() {
+  local session_options=$1
+
+  if [ -z "$session_options" ]; then
+    session_options="{}"
+  fi
+
+  curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"properties\": $session_options}" \
+    "http://localhost:$SQL_GATEWAY_REST_PORT/sessions"; | jq -r '.sessionHandle'
+}
+
+function configure_session() {
+  local session_handle=$1
+  local statement=$2
+
+  response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/configure-session)
+
+  if [ "$response" != "{}" ]; then
+    echo "Configure session $session_handle $statement failed: $response"
+    exit 1
+  fi
+  echo "Configured session $session_handle $statement successfully"
+}
+
+function execute_statement() {
+  local session_handle=$1
+  local statement=$2
+
+  local response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/v3/sessions/$session_handle/statements)
+
+  local operation_handle=$(echo $response | jq -r '.operationHandle')
+  if [ -z "$operation_handle" ] || [ "$operation_handle" == "null" ]; then
+    echo "Failed to execute statement: $statement, response: $response"
+    exit 1
+  fi
+  get_operation_result $session_handle $operation_handle
+}
+
+function get_operation_result() {
+  local session_handle=$1
+  local operation_handle=$2
+
+  local fields_array=()
+  local 
next_uri="v3/sessions/$session_handle/operations/$operation_handle/result/0"
+  while [ ! -z "$next_uri" ] && [ "$next_uri" != "null" ];
+  do
+    response=$(curl -s -X GET \
+      -H "Content-Type:  \
+       application/json" \
+      http://localhost:$SQL_GATEWAY_REST_PORT/$next_uri)
+    result_type=$(echo $response | jq -r '.resultType')
+    result_kind=$(echo $response | jq -r '.resultKind')
+    next_uri=$(echo $response | jq -r '.nextResultUri')
+    errors=$(echo $response | jq -r '.errors')
+    if [ "$errors" != "null" ]; then
+      echo "fetch operation $operation_handle failed: $errors"
+      exit 1
+    fi
+    if [ result_kind == "SUCCESS" ]; then
+      fields_array+="SUCCESS"
+      break;
+    fi
+    if [ "$result_type" != "NOT_READY" ] && [ "$result_kind" == 
"SUCCESS_WITH_CONTENT" ]; then
+      new_fields=$(echo $response | jq -r '.results.data[].fields')
+      fields_array+=$new_fields
+    else
+      sleep 1
+    fi
+  done
+  echo $fields_array
+}
+
+function create_materialized_table_in_continous_mode() {
+  local session_handle=$1
+  local table_name=$2
+  local operation_handle=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"CREATE MATERIALIZED TABLE $table_name \
+        PARTITIONED BY (ds) \
+        with (\
+          'format' = 'json',\
+          'sink.rolling-policy.rollover-interval' = '10s',\
+          'sink.rolling-policy.check-interval' = '10s'\
+        )\
+        FRESHNESS = INTERVAL '10' SECOND \
+        AS SELECT \
+          DATE_FORMAT(\`timestamp\`, 'yyyy-MM-dd') AS ds, \
+          \`user\`, \
+          \`type\` \
+        FROM filesystem_source \/*+ options('source.monitor-interval' = '10s') 
*\/ \"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements | 
jq -r '.operationHandle')
+  get_operation_result $session_handle $operation_handle
+}
+
+function create_filesystem_source() {
+  local session_handle=$1
+  local table_name=$2
+  local path=$3
+
+  create_source_result=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"CREATE TABLE $table_name (\
+      \`timestamp\` TIMESTAMP_LTZ(3),\
+      \`user\` STRING,\
+      \`type\` STRING\
+    ) WITH (\
+      'format' = 'csv'\
+    )\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements/)
+
+    echo $create_source_result
+}
+
+S3_ENDPOINT=${S3_ENDPOINT//localhost/$(get_host_machine_address)}
+echo "[INFO] Create a new session"
+session_options="{\"table.catalog-store.kind\": \"file\",
+                 \"table.catalog-store.file.path\": 
\"$MATERIALIZED_TABLE_DATA_DIR/\",
+                 \"execution.target\": \"kubernetes-application\",
+                 \"kubernetes.cluster-id\": \"${APPLICATION_CLUSTER_ID}\",
+                 \"kubernetes.container.image.ref\": \"${FLINK_IMAGE_NAME}\",
+                 \"jobmanager.memory.process.size\": \"1088m\",
+                 \"taskmanager.memory.process.size\": \"1000m\",
+                 \"kubernetes.jobmanager.cpu\": \"0.5\",
+                 \"kubernetes.taskmanager.cpu\": \"0.5\",
+                 \"kubernetes.rest-service.exposed.type\": \"NodePort\",
+                 \"s3.endpoint\": \"$S3_ENDPOINT\",
+                 \"workflow-scheduler.type\": \"embedded\"}"
+
+session_handle=$(open_session "$session_options")
+echo "[INFO] Session Handle $session_handle"
+
+# prepare catalog & database
+echo "[INFO] Create catalog & database"
+configure_session "$session_handle" "create catalog if not exists test_catalog 
with (\
+                                                'type' = 'test-filesystem',\
+                                                'default-database' = 
'test_db',\
+                                                'path' = 
'$MATERIALIZED_TABLE_DATA_DIR/'\
+                                    )"
+configure_session $session_handle "use catalog test_catalog"
+configure_session $session_handle "create database if not exists test_db"
+create_filesystem_source $session_handle "filesystem_source"
+
+# 1. create materialized table in continuous mode
+echo "[INFO] Create Materialized table in continuous mode"
+create_materialized_table_in_continous_mode $session_handle 
"my_materialized_table_in_continuous_mode"
+echo "[INFO] Wait deployment ${APPLICATION_CLUSTER_ID} Available"
+kubectl wait --for=condition=Available --timeout=30s 
deploy/${APPLICATION_CLUSTER_ID} || exit 1
+jm_pod_name=$(kubectl get pods 
--selector="app=${APPLICATION_CLUSTER_ID},component=jobmanager" -o 
jsonpath='{..metadata.name}')
+echo "[INFO] Wait first checkpoint finished"
+wait_num_checkpoints $jm_pod_name 1
+
+# 2. suspend & resume materialized table in continuous mode
+echo "[INFO] Suspend materialized table"
+configure_session $session_handle "set 'execution.checkpointing.savepoint-dir' 
= '$MATERIALIZED_TABLE_DATA_DIR/savepoint'"
+execute_statement $session_handle "alter materialized table 
my_materialized_table_in_continuous_mode suspend"
+
+kubectl wait --for=delete deployment/$APPLICATION_CLUSTER_ID
+
+echo "[INFO] Resume materialized table"
+execute_statement $session_handle "alter materialized table 
my_materialized_table_in_continuous_mode resume"
+kubectl wait --for=condition=Available --timeout=30s 
deploy/${APPLICATION_CLUSTER_ID} || exit 1
+jm_pod_name=$(kubectl get pods 
--selector="app=${APPLICATION_CLUSTER_ID},component=jobmanager" -o 
jsonpath='{..metadata.name}')
+echo "[INFO] Wait resumed job finished the first checkpoint"
+wait_num_checkpoints $jm_pod_name 1
+
+# 3. verify resumed continuous job is restore from savepoint
+mkdir -p "$LOCAL_LOGS_PATH"
+kubectl logs $jm_pod_name > $LOCAL_LOGS_PATH/jobmanager.log
+grep -E "Starting job [A-Za-z0-9]+ from savepoint" 
$LOCAL_LOGS_PATH/jobmanager.log
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
index 2c2ea78f1003..14ad047ebfb0 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -20,12 +20,17 @@ package 
org.apache.flink.table.gateway.service.materializedtable;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.deployment.application.ApplicationConfiguration;
+import 
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogMaterializedTable;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.IntervalFreshness;
@@ -42,6 +47,7 @@ import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil;
 import org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions;
 import org.apache.flink.table.gateway.service.operation.OperationExecutor;
@@ -61,6 +67,7 @@ import 
org.apache.flink.table.refresh.ContinuousRefreshHandler;
 import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer;
 import org.apache.flink.table.refresh.RefreshHandler;
 import org.apache.flink.table.refresh.RefreshHandlerSerializer;
+import org.apache.flink.table.runtime.application.SqlDriver;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.workflow.CreatePeriodicRefreshWorkflow;
 import org.apache.flink.table.workflow.CreateRefreshWorkflow;
@@ -95,6 +102,7 @@ import static 
org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRE
 import static org.apache.flink.configuration.DeploymentOptions.TARGET;
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
 import static org.apache.flink.configuration.PipelineOptions.NAME;
+import static 
org.apache.flink.configuration.PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID;
 import static 
org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH;
 import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.DATE_FORMATTER;
 import static 
org.apache.flink.table.api.config.MaterializedTableConfigOptions.PARTITION_FIELDS;
@@ -337,12 +345,12 @@ public class MaterializedTableManager {
                                 tableIdentifier, refreshHandler.getJobId()));
             }
 
-            String savepointPath =
-                    stopJobWithSavepoint(operationExecutor, handle, 
refreshHandler.getJobId());
+            String savepointPath = stopJobWithSavepoint(operationExecutor, 
handle, refreshHandler);
 
             ContinuousRefreshHandler updateRefreshHandler =
                     new ContinuousRefreshHandler(
                             refreshHandler.getExecutionTarget(),
+                            refreshHandler.getClusterId(),
                             refreshHandler.getJobId(),
                             savepointPath);
 
@@ -565,17 +573,12 @@ public class MaterializedTableManager {
                         materializedTableIdentifier,
                         catalogMaterializedTable.getDefinitionQuery(),
                         dynamicOptions);
-        // submit flink streaming job
-        ResultFetcher resultFetcher =
-                operationExecutor.executeStatement(handle, customConfig, 
insertStatement);
 
-        // get execution.target and jobId, currently doesn't support yarn and 
k8s, so doesn't
-        // get clusterId
-        List<RowData> results = fetchAllResults(resultFetcher);
-        String jobId = results.get(0).getString(0).toString();
-        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+        JobExecutionResult result =
+                executeRefreshJob(insertStatement, customConfig, 
operationExecutor, handle);
         ContinuousRefreshHandler continuousRefreshHandler =
-                new ContinuousRefreshHandler(executeTarget, jobId);
+                new ContinuousRefreshHandler(
+                        result.executionTarget, result.clusterId, 
result.jobId);
         byte[] serializedBytes = 
serializeContinuousHandler(continuousRefreshHandler);
 
         updateRefreshHandler(
@@ -659,17 +662,19 @@ public class MaterializedTableManager {
                     "Begin to refreshing the materialized table {}, statement: 
{}",
                     materializedTableIdentifier,
                     insertStatement);
-            ResultFetcher resultFetcher =
-                    operationExecutor.executeStatement(handle, customConfig, 
insertStatement);
+            JobExecutionResult result =
+                    executeRefreshJob(insertStatement, customConfig, 
operationExecutor, handle);
 
-            List<RowData> results = fetchAllResults(resultFetcher);
-            String jobId = results.get(0).getString(0).toString();
-            String executeTarget =
-                    
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
             Map<StringData, StringData> clusterInfo = new HashMap<>();
             clusterInfo.put(
-                    StringData.fromString(TARGET.key()), 
StringData.fromString(executeTarget));
-            // TODO get clusterId
+                    StringData.fromString(TARGET.key()),
+                    StringData.fromString(result.executionTarget));
+            Optional<String> clusterIdKeyName = 
getClusterIdKeyName(result.executionTarget);
+            clusterIdKeyName.ifPresent(
+                    s ->
+                            clusterInfo.put(
+                                    StringData.fromString(s),
+                                    StringData.fromString(result.clusterId)));
 
             return ResultFetcher.fromResults(
                     handle,
@@ -680,7 +685,7 @@ public class MaterializedTableManager {
                                     DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()))),
                     Collections.singletonList(
                             GenericRowData.of(
-                                    StringData.fromString(jobId),
+                                    StringData.fromString(result.jobId),
                                     new GenericMapData(clusterInfo))));
         } catch (Exception e) {
             throw new SqlExecutionException(
@@ -895,7 +900,7 @@ public class MaterializedTableManager {
             // handler)
             List<MaterializedTableChange> tableChanges = new 
ArrayList<>(op.getTableChanges());
             TableChange.ModifyRefreshHandler modifyRefreshHandler =
-                    genereateResetSavepointTableChange(
+                    generateResetSavepointTableChange(
                             
oldMaterializedTable.getSerializedRefreshHandler());
             tableChanges.add(modifyRefreshHandler);
 
@@ -953,18 +958,19 @@ public class MaterializedTableManager {
                 op.getTableIdentifier(), rollbackChanges, 
oldMaterializedTable);
     }
 
-    private TableChange.ModifyRefreshHandler 
genereateResetSavepointTableChange(
+    private TableChange.ModifyRefreshHandler generateResetSavepointTableChange(
             byte[] serializedContinuousHandler) {
         ContinuousRefreshHandler continuousRefreshHandler =
                 deserializeContinuousHandler(serializedContinuousHandler);
-        ContinuousRefreshHandler resetedRefreshHandler =
+        ContinuousRefreshHandler resetContinuousRefreshHandler =
                 new ContinuousRefreshHandler(
                         continuousRefreshHandler.getExecutionTarget(),
+                        continuousRefreshHandler.getClusterId(),
                         continuousRefreshHandler.getJobId());
 
         return TableChange.modifyRefreshHandler(
-                resetedRefreshHandler.asSummaryString(),
-                serializeContinuousHandler(resetedRefreshHandler));
+                resetContinuousRefreshHandler.asSummaryString(),
+                serializeContinuousHandler(resetContinuousRefreshHandler));
     }
 
     private ResultFetcher callDropMaterializedTableOperation(
@@ -1024,7 +1030,7 @@ public class MaterializedTableManager {
         JobStatus jobStatus = getJobStatus(operationExecutor, handle, 
refreshHandler);
         if (!jobStatus.isTerminalState()) {
             try {
-                cancelJob(operationExecutor, handle, 
refreshHandler.getJobId());
+                cancelJob(operationExecutor, handle, refreshHandler);
             } catch (Exception e) {
                 jobStatus = getJobStatus(operationExecutor, handle, 
refreshHandler);
                 if (!jobStatus.isTerminalState()) {
@@ -1114,7 +1120,7 @@ public class MaterializedTableManager {
             ContinuousRefreshHandler refreshHandler) {
         ResultFetcher resultFetcher =
                 operationExecutor.callDescribeJobOperation(
-                        operationExecutor.getTableEnvironment(),
+                        getTableEnvironment(operationExecutor, refreshHandler),
                         handle,
                         new DescribeJobOperation(refreshHandler.getJobId()));
         List<RowData> result = fetchAllResults(resultFetcher);
@@ -1123,31 +1129,51 @@ public class MaterializedTableManager {
     }
 
     private static void cancelJob(
-            OperationExecutor operationExecutor, OperationHandle handle, 
String jobId) {
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ContinuousRefreshHandler refreshHandler) {
         operationExecutor.callStopJobOperation(
-                operationExecutor.getTableEnvironment(),
+                getTableEnvironment(operationExecutor, refreshHandler),
                 handle,
-                new StopJobOperation(jobId, false, false));
+                new StopJobOperation(refreshHandler.getJobId(), false, false));
     }
 
     private static String stopJobWithSavepoint(
-            OperationExecutor executor, OperationHandle handle, String jobId) {
+            OperationExecutor executor,
+            OperationHandle handle,
+            ContinuousRefreshHandler refreshHandler) {
         // check savepoint dir is configured
         Optional<String> savepointDir =
                 
executor.getSessionContext().getSessionConf().getOptional(SAVEPOINT_DIRECTORY);
-        if (!savepointDir.isPresent()) {
+        if (savepointDir.isEmpty()) {
             throw new ValidationException(
                     "Savepoint directory is not configured, can't stop job 
with savepoint.");
         }
+        String jobId = refreshHandler.getJobId();
         ResultFetcher resultFetcher =
                 executor.callStopJobOperation(
-                        executor.getTableEnvironment(),
+                        getTableEnvironment(executor, refreshHandler),
                         handle,
                         new StopJobOperation(jobId, true, false));
         List<RowData> results = fetchAllResults(resultFetcher);
         return results.get(0).getString(0).toString();
     }
 
+    private static TableEnvironmentInternal getTableEnvironment(
+            OperationExecutor executor, ContinuousRefreshHandler 
refreshHandler) {
+
+        String target = refreshHandler.getExecutionTarget();
+        Configuration sessionConfiguration = new Configuration();
+        sessionConfiguration.set(TARGET, target);
+        Optional<String> clusterIdKeyName = getClusterIdKeyName(target);
+        clusterIdKeyName.ifPresent(
+                s -> sessionConfiguration.setString(s, 
refreshHandler.getClusterId()));
+
+        return executor.getTableEnvironment(
+                executor.getSessionContext().getSessionState().resourceManager,
+                sessionConfiguration);
+    }
+
     private ContinuousRefreshHandler deserializeContinuousHandler(byte[] 
serializedRefreshHandler) {
         try {
             return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
@@ -1244,4 +1270,107 @@ public class MaterializedTableManager {
         }
         return results;
     }
+
+    private static JobExecutionResult executeRefreshJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+        if (executeTarget == null || executeTarget.isEmpty() || 
"local".equals(executeTarget)) {
+            String errorMessage =
+                    String.format(
+                            "Unsupported execution target detected: %s."
+                                    + "Currently, only the following execution 
targets are supported: "
+                                    + "'remote', 'yarn-session', 
'yarn-application', 'kubernetes-session', 'kubernetes-application'. ",
+                            executeTarget);
+            LOG.error(errorMessage);
+            throw new ValidationException(errorMessage);
+        }
+
+        if (executeTarget.endsWith("application")) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeNonApplicationJob(
+                    script, executionConfig, operationExecutor, 
operationHandle);
+        }
+    }
+
+    private static JobExecutionResult executeNonApplicationJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+        String clusterId =
+                operationExecutor
+                        .getSessionClusterId()
+                        .orElseThrow(
+                                () -> {
+                                    String errorMessage =
+                                            String.format(
+                                                    "No cluster ID found when 
executing materialized table refresh job. Execution target is : %s",
+                                                    executeTarget);
+                                    LOG.error(errorMessage);
+                                    return new 
ValidationException(errorMessage);
+                                });
+
+        ResultFetcher resultFetcher =
+                operationExecutor.executeStatement(operationHandle, 
executionConfig, script);
+        List<RowData> results = fetchAllResults(resultFetcher);
+        String jobId = results.get(0).getString(0).toString();
+
+        return new JobExecutionResult(executeTarget, clusterId, jobId);
+    }
+
+    private static JobExecutionResult executeApplicationJob(
+            String script, Configuration executionConfig, OperationExecutor 
operationExecutor) {
+        List<String> arguments = new ArrayList<>();
+        arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
+        arguments.add(script);
+
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+        JobID jobId = new JobID();
+        mergedConfig.set(PIPELINE_FIXED_JOB_ID, jobId.toString());
+
+        ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        arguments.toArray(new String[0]), 
SqlDriver.class.getName());
+        try {
+            String clusterId =
+                    new ApplicationClusterDeployer(new 
DefaultClusterClientServiceLoader())
+                            .run(mergedConfig, applicationConfiguration)
+                            .toString();
+
+            return new JobExecutionResult(mergedConfig.get(TARGET), clusterId, 
jobId.toString());
+        } catch (Throwable t) {
+            LOG.error("Failed to deploy script {} to application cluster.", 
script, t);
+            throw new SqlGatewayException("Failed to deploy script to 
cluster.", t);
+        }
+    }
+
+    private static class JobExecutionResult {
+
+        private final String executionTarget;
+        private final String clusterId;
+        private final String jobId;
+
+        public JobExecutionResult(String executionTarget, String clusterId, 
String jobId) {
+            this.executionTarget = executionTarget;
+            this.clusterId = clusterId;
+            this.jobId = jobId;
+        }
+    }
+
+    private static Optional<String> getClusterIdKeyName(String targetName) {
+        if (targetName.startsWith("yarn")) {
+            return Optional.of("yarn.application.id");
+        } else if (targetName.startsWith("kubernetes")) {
+            return Optional.of("kubernetes.cluster-id");
+        } else {
+            return Optional.empty();
+        }
+    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index e89e65f654fb..9612b57fccab 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -416,6 +416,14 @@ public class OperationExecutor {
                 
sessionContext.getSessionState().functionCatalog.copy(resourceManager));
     }
 
+    public <ClusterID> Optional<String> getSessionClusterId() {
+        ClusterClientFactory<ClusterID> clusterClientFactory =
+                
clusterClientServiceLoader.getClusterClientFactory(sessionContext.getSessionConf());
+        ClusterID clusterID = 
clusterClientFactory.getClusterId(sessionContext.getSessionConf());
+
+        return Optional.ofNullable(clusterID).map(ClusterID::toString);
+    }
+
     private static Executor lookupExecutor(
             StreamExecutionEnvironment executionEnvironment, ClassLoader 
userClassLoader) {
         try {
diff --git 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index 784ff40d15a3..8fdb1eac326b 100644
--- 
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++ 
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -145,7 +145,8 @@ class CatalogBaseTableResolutionTest {
                             "primary_constraint", 
Collections.singletonList("id")));
 
     private static final ContinuousRefreshHandler CONTINUOUS_REFRESH_HANDLER =
-            new ContinuousRefreshHandler("remote", 
JobID.generate().toHexString());
+            new ContinuousRefreshHandler(
+                    "remote", "StandaloneClusterId", 
JobID.generate().toHexString());
 
     private static final String DEFINITION_QUERY =
             String.format(
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java
index 2798f4051e46..19eff424dd47 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java
@@ -31,20 +31,22 @@ public class ContinuousRefreshHandler implements 
RefreshHandler, Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    // TODO: add clusterId for yarn and k8s resource manager
     private final String executionTarget;
+    private final String clusterId;
     private final String jobId;
+    private final @Nullable String restorePath;
 
-    private @Nullable final String restorePath;
-
-    public ContinuousRefreshHandler(String executionTarget, String jobId) {
+    public ContinuousRefreshHandler(String executionTarget, String clusterId, 
String jobId) {
         this.executionTarget = executionTarget;
+        this.clusterId = clusterId;
         this.jobId = jobId;
         this.restorePath = null;
     }
 
-    public ContinuousRefreshHandler(String executionTarget, String jobId, 
String restorePath) {
+    public ContinuousRefreshHandler(
+            String executionTarget, String clusterId, String jobId, String 
restorePath) {
         this.executionTarget = executionTarget;
+        this.clusterId = clusterId;
         this.jobId = jobId;
         this.restorePath = restorePath;
     }
@@ -57,6 +59,10 @@ public class ContinuousRefreshHandler implements 
RefreshHandler, Serializable {
         return jobId;
     }
 
+    public String getClusterId() {
+        return clusterId;
+    }
+
     public Optional<String> getRestorePath() {
         return Optional.ofNullable(restorePath);
     }
@@ -64,9 +70,10 @@ public class ContinuousRefreshHandler implements 
RefreshHandler, Serializable {
     @Override
     public String asSummaryString() {
         return String.format(
-                "{\njobId=%s,\n executionTarget=%s%s\n}",
-                jobId,
+                "{\n executionTarget=%s,\n clusterId=%s,\n jobId=%s%s\n}",
                 executionTarget,
+                clusterId,
+                jobId,
                 restorePath == null ? "" : ",\n restorePath=" + restorePath);
     }
 }


Reply via email to