lsyldliu commented on code in PR #25988:
URL: https://github.com/apache/flink/pull/25988#discussion_r1917570382


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java:
##########
@@ -31,22 +31,37 @@ public class ContinuousRefreshHandler implements 
RefreshHandler, Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    // TODO: add clusterId for yarn and k8s resource manager
     private final String executionTarget;
     private final String jobId;
 
+    private @Nullable final String clusterId;
+
     private @Nullable final String restorePath;
 
     public ContinuousRefreshHandler(String executionTarget, String jobId) {

Review Comment:
   This construct method is not used, so we can delete it, and then remove the 
`@Nullable` of variable `clusterId` because this is an internal class.
   
   BTW, the asSummaryString method also needs to be updated.



##########
flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java:
##########
@@ -273,7 +273,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
             return deserializeTable(
                     tableInfo.getTableKind(),
                     tableInfo.getCatalogTableInfo(),
-                    tableDataPath.getPath());
+                    tableDataPath.toString());

Review Comment:
   Why need we to change this?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeSessionJob(script, executionConfig, 
operationExecutor, operationHandle);
+        }
+    }
+
+    private boolean isApplicationMode(Configuration configuration) {
+        String target = configuration.get(TARGET);
+        return target.endsWith("application");
+    }
+
+    private JobExecutionResult executeSessionJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+
+        // submit flink streaming job
+        ResultFetcher resultFetcher =
+                operationExecutor.executeStatement(operationHandle, 
mergedConfig, script);
+
+        // get execution.target and jobId, clusterId
+        List<RowData> results = fetchAllResults(resultFetcher);
+        String jobId = results.get(0).getString(0).toString();
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+        Optional<String> clusterId = operationExecutor.getSessionClusterId();
+
+        return new JobExecutionResult(jobId, executeTarget, 
clusterId.orElse(null));
+    }
+
+    private JobExecutionResult executeApplicationJob(
+            String script, Configuration executionConfig, OperationExecutor 
operationExecutor) {
+        List<String> arguments = new ArrayList<>();
+        arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
+        arguments.add(script);
+
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+        JobID jobId = new JobID();
+        mergedConfig.set(PIPELINE_FIXED_JOB_ID, jobId.toString());
+
+        ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        arguments.toArray(new String[0]), 
SqlDriver.class.getName());
+        try {
+            String clusterId =
+                    new ApplicationClusterDeployer(new 
DefaultClusterClientServiceLoader())
+                            .run(mergedConfig, applicationConfiguration)
+                            .toString();
+
+            return new JobExecutionResult(jobId.toString(), 
mergedConfig.get(TARGET), clusterId);
+        } catch (Throwable t) {
+            LOG.error("Failed to deploy script to application cluster.", t);
+            throw new SqlGatewayException("Failed to deploy script to 
cluster.", t);
+        }
+    }
+
+    private static class JobExecutionResult {
+        private final String jobId;

Review Comment:
   We can make the code more organized:
   ```
           private final String executionTarget;
           private final String clusterIdKeyName;
           private final String clusterId;
           private final String jobId;
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeSessionJob(script, executionConfig, 
operationExecutor, operationHandle);
+        }
+    }
+
+    private boolean isApplicationMode(Configuration configuration) {
+        String target = configuration.get(TARGET);
+        return target.endsWith("application");
+    }
+
+    private JobExecutionResult executeSessionJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+
+        // submit flink streaming job
+        ResultFetcher resultFetcher =
+                operationExecutor.executeStatement(operationHandle, 
mergedConfig, script);
+
+        // get execution.target and jobId, clusterId
+        List<RowData> results = fetchAllResults(resultFetcher);
+        String jobId = results.get(0).getString(0).toString();
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+        Optional<String> clusterId = operationExecutor.getSessionClusterId();
+
+        return new JobExecutionResult(jobId, executeTarget, 
clusterId.orElse(null));
+    }
+
+    private JobExecutionResult executeApplicationJob(
+            String script, Configuration executionConfig, OperationExecutor 
operationExecutor) {
+        List<String> arguments = new ArrayList<>();
+        arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
+        arguments.add(script);
+
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+        JobID jobId = new JobID();
+        mergedConfig.set(PIPELINE_FIXED_JOB_ID, jobId.toString());
+
+        ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        arguments.toArray(new String[0]), 
SqlDriver.class.getName());
+        try {
+            String clusterId =
+                    new ApplicationClusterDeployer(new 
DefaultClusterClientServiceLoader())
+                            .run(mergedConfig, applicationConfiguration)
+                            .toString();
+
+            return new JobExecutionResult(jobId.toString(), 
mergedConfig.get(TARGET), clusterId);
+        } catch (Throwable t) {
+            LOG.error("Failed to deploy script to application cluster.", t);
+            throw new SqlGatewayException("Failed to deploy script to 
cluster.", t);
+        }
+    }
+
+    private static class JobExecutionResult {
+        private final String jobId;
+
+        private final String executionTarget;
+
+        private final @Nullable String clusterId;
+
+        private final @Nullable String clusterIdKeyName;
+
+        public JobExecutionResult(
+                String jobId, String executionTarget, @Nullable String 
clusterId) {
+            this.jobId = jobId;
+            this.executionTarget = executionTarget;
+            this.clusterId = clusterId;
+            this.clusterIdKeyName = convertClusterIdKeyName(executionTarget);
+        }
+    }
+
+    private static @Nullable String convertClusterIdKeyName(String targetName) 
{
+        if (targetName.startsWith("yarn")) {
+            return "yarn.application.id";
+        } else if (targetName.startsWith("kubernetes")) {
+            return "kubernetes.cluster-id";
+        } else {
+            return null;

Review Comment:
   In other deploy modes(remote, we don't support local and I also don't know 
its function), the clusterId we return should be StandaloneClusterId#toString 
and not a null value?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -416,6 +416,14 @@ public TableEnvironmentInternal getTableEnvironment(
                 
sessionContext.getSessionState().functionCatalog.copy(resourceManager));
     }
 
+    public <ClusterID> Optional<String> getSessionClusterId() {

Review Comment:
   We can always get the clusterId, so we don't need to use Optional?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeSessionJob(script, executionConfig, 
operationExecutor, operationHandle);
+        }
+    }
+
+    private boolean isApplicationMode(Configuration configuration) {
+        String target = configuration.get(TARGET);
+        return target.endsWith("application");
+    }
+
+    private JobExecutionResult executeSessionJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+
+        // submit flink streaming job
+        ResultFetcher resultFetcher =
+                operationExecutor.executeStatement(operationHandle, 
mergedConfig, script);
+
+        // get execution.target and jobId, clusterId
+        List<RowData> results = fetchAllResults(resultFetcher);
+        String jobId = results.get(0).getString(0).toString();
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+        Optional<String> clusterId = operationExecutor.getSessionClusterId();
+
+        return new JobExecutionResult(jobId, executeTarget, 
clusterId.orElse(null));
+    }
+
+    private JobExecutionResult executeApplicationJob(
+            String script, Configuration executionConfig, OperationExecutor 
operationExecutor) {
+        List<String> arguments = new ArrayList<>();
+        arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
+        arguments.add(script);
+
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+        JobID jobId = new JobID();
+        mergedConfig.set(PIPELINE_FIXED_JOB_ID, jobId.toString());
+
+        ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        arguments.toArray(new String[0]), 
SqlDriver.class.getName());
+        try {
+            String clusterId =
+                    new ApplicationClusterDeployer(new 
DefaultClusterClientServiceLoader())
+                            .run(mergedConfig, applicationConfiguration)
+                            .toString();
+
+            return new JobExecutionResult(jobId.toString(), 
mergedConfig.get(TARGET), clusterId);
+        } catch (Throwable t) {
+            LOG.error("Failed to deploy script to application cluster.", t);
+            throw new SqlGatewayException("Failed to deploy script to 
cluster.", t);
+        }
+    }
+
+    private static class JobExecutionResult {
+        private final String jobId;
+
+        private final String executionTarget;
+
+        private final @Nullable String clusterId;
+
+        private final @Nullable String clusterIdKeyName;

Review Comment:
   We don't need this, the executionTarget is enougth.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {

Review Comment:
   Let's check the target is not null and not local before execute job, 
otherwise throw exception. 
   
   For MT,   I think we only support four deploy cluster modes: minicluster, 
standalone, yarn, k8s, so we only need to consider them, so the corresponding 
`execution.target`  we only need to consider `remote`, `yarn`, `k8s`.  For 
minicluster and standalone clusters, `execution.target` is the remote, yarn is 
`yarn-session` and `yarn-application`, k8s is `kubernetes-session` and 
`kubernetes-application`. 
   
   When `execution.target` is local, I don't know what is it and its function, 
we don't need to support it.
   
   



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeSessionJob(script, executionConfig, 
operationExecutor, operationHandle);
+        }
+    }
+
+    private boolean isApplicationMode(Configuration configuration) {
+        String target = configuration.get(TARGET);
+        return target.endsWith("application");
+    }
+
+    private JobExecutionResult executeSessionJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+
+        // submit flink streaming job
+        ResultFetcher resultFetcher =
+                operationExecutor.executeStatement(operationHandle, 
mergedConfig, script);
+
+        // get execution.target and jobId, clusterId
+        List<RowData> results = fetchAllResults(resultFetcher);
+        String jobId = results.get(0).getString(0).toString();
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+        Optional<String> clusterId = operationExecutor.getSessionClusterId();
+
+        return new JobExecutionResult(jobId, executeTarget, 
clusterId.orElse(null));
+    }
+
+    private JobExecutionResult executeApplicationJob(
+            String script, Configuration executionConfig, OperationExecutor 
operationExecutor) {
+        List<String> arguments = new ArrayList<>();
+        arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
+        arguments.add(script);
+
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+        JobID jobId = new JobID();
+        mergedConfig.set(PIPELINE_FIXED_JOB_ID, jobId.toString());

Review Comment:
   We must set jobId in advance for application mode?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeSessionJob(script, executionConfig, 
operationExecutor, operationHandle);
+        }
+    }
+
+    private boolean isApplicationMode(Configuration configuration) {

Review Comment:
   ```
       private boolean isApplicationMode(Configuration configuration) {
           return configuration.get(TARGET).endsWith("application");
       }
   ```



##########
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh:
##########
@@ -0,0 +1,242 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source "$(dirname "$0")"/common_kubernetes.sh
+
+CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
+CLUSTER_ROLE_BINDING="flink-role-binding-default"
+APPLICATION_CLUSTER_ID="flink-native-k8s-sql-mt-application-1"
+SESSION_CLUSTER_ID="flink-native-k8s-sql-mt-session-1"
+FLINK_IMAGE_NAME="test_kubernetes_materialized_table-1"
+LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"
+IMAGE_BUILD_RETRIES=3
+IMAGE_BUILD_BACKOFF=2
+
+# copy test-filesystem jar & hadoop plugin
+TEST_FILE_SYSTEM_JAR=`ls 
${END_TO_END_DIR}/../flink-test-utils-parent/flink-table-filesystem-test-utils/target/flink-table-filesystem-test-utils-*.jar`
+cp $TEST_FILE_SYSTEM_JAR ${FLINK_DIR}/lib/
+add_optional_plugin "s3-fs-hadoop"
+
+# start kubernetes
+start_kubernetes
+kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit 
--serviceaccount=default:default --namespace=default
+
+# build image
+if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image 
${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then
+       echo "ERROR: Could not build image. Aborting..."
+       exit 1
+fi
+
+# setup materialized table data dir
+echo "[INFO] Start S3 env"
+source "$(dirname "$0")"/common_s3_minio.sh
+s3_setup hadoop
+S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/"
+MATERIALIZED_TABLE_DATA_DIR="${S3_TEST_DATA_WORDS_URI}"
+
+
+echo "[INFO] Start SQL Gateway"
+set_config_key "sql-gateway.endpoint.rest.address" "localhost"
+start_sql_gateway
+
+SQL_GATEWAY_REST_PORT=8083
+
+function internal_cleanup {
+    kubectl delete deployment ${APPLICATION_CLUSTER_ID}
+    kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
+}
+
+function open_session() {
+  local session_options=$1
+
+  if [ -z "$session_options" ]; then
+    session_options="{}"
+  fi
+
+  curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"properties\": $session_options}" \
+    "http://localhost:$SQL_GATEWAY_REST_PORT/sessions"; | jq -r '.sessionHandle'
+}
+
+function configure_session() {
+  local session_handle=$1
+  local statement=$2
+
+  response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/configure-session)
+
+  if [ "$response" != "{}" ]; then
+    echo "Configure session $session_handle $statement failed: $response"
+    exit 1
+  fi
+  echo "Configured session $session_handle $statement successfully"
+}
+
+function close_session() {
+  local session_handle=$1
+  curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"session_handle\": \"$session_handle\"}" \
+    http://localhost:$SQL_GATEWAY_REST_PORT/sessions/close
+}
+
+function execute_statement() {
+  local session_handle=$1
+  local statement=$2
+
+  local response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/v3/sessions/$session_handle/statements)
+
+  local operation_handle=$(echo $response | jq -r '.operationHandle')
+  if [ -z "$operation_handle" ] || [ "$operation_handle" == "null" ]; then
+    echo "Failed to execute statement: $statement, response: $response"
+    exit 1
+  fi
+  get_operation_result $session_handle $operation_handle
+}
+
+function get_operation_result() {
+  local session_handle=$1
+  local operation_handle=$2
+
+  local fields_array=()
+  local 
next_uri="v3/sessions/$session_handle/operations/$operation_handle/result/0"
+  while [ ! -z "$next_uri" ] && [ "$next_uri" != "null" ];
+  do
+    response=$(curl -s -X GET \
+      -H "Content-Type:  \
+       application/json" \
+      http://localhost:$SQL_GATEWAY_REST_PORT/$next_uri)
+    result_type=$(echo $response | jq -r '.resultType')
+    result_kind=$(echo $response | jq -r '.resultKind')
+    next_uri=$(echo $response | jq -r '.nextResultUri')
+    errors=$(echo $response | jq -r '.errors')
+    if [ "$errors" != "null" ]; then
+      echo "fetch operation $operation_handle failed: $errors"
+      exit 1
+    fi
+    if [ result_kind == "SUCCESS" ]; then
+      fields_array+="SUCCESS"
+      break;
+    fi
+    if [ "$result_type" != "NOT_READY" ] && [ "$result_kind" == 
"SUCCESS_WITH_CONTENT" ]; then
+      new_fields=$(echo $response | jq -r '.results.data[].fields')
+      fields_array+=$new_fields
+    else
+      sleep 1
+    fi
+  done
+  echo $fields_array
+}
+
+function create_materialized_table_in_continous_mode() {
+  local session_handle=$1
+  local table_name=$2
+  local operation_handle=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"CREATE MATERIALIZED TABLE $table_name \
+        PARTITIONED BY (ds) \
+        with (\
+          'format' = 'json',\
+          'sink.rolling-policy.rollover-interval' = '10s',\
+          'sink.rolling-policy.check-interval' = '10s'\
+        )\
+        FRESHNESS = INTERVAL '10' SECOND \
+        AS SELECT \
+          DATE_FORMAT(\`timestamp\`, 'yyyy-MM-dd') AS ds, \
+          \`user\`, \
+          \`type\` \
+        FROM filesystem_source \/*+ options('source.monitor-interval' = '10s') 
*\/ \"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements | 
jq -r '.operationHandle')
+  get_operation_result $session_handle $operation_handle
+}
+
+function create_filesystem_source() {
+  local session_handle=$1
+  local table_name=$2
+  local path=$3
+
+  create_source_result=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"CREATE TABLE $table_name (\
+      \`timestamp\` TIMESTAMP_LTZ(3),\
+      \`user\` STRING,\
+      \`type\` STRING\
+    ) WITH (\
+      'format' = 'csv'\
+    )\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements/)
+
+    echo $create_source_result
+}
+
+S3_ENDPOINT=${S3_ENDPOINT//localhost/$(get_host_machine_address)}
+echo "[INFO] Create Materialized Table in Application Mode"
+session_options="{\"table.catalog-store.kind\": \"file\",
+                 \"execution.target\": \"kubernetes-application\",
+                 \"kubernetes.cluster-id\": \"${APPLICATION_CLUSTER_ID}\",
+                 \"kubernetes.container.image.ref\": \"${FLINK_IMAGE_NAME}\",
+                 \"table.catalog-store.file.path\": 
\"$MATERIALIZED_TABLE_DATA_DIR/\",
+                 \"kubernetes.rest-service.exposed.type\": \"NodePort\",
+                 \"s3.endpoint\": \"$S3_ENDPOINT\",
+                 \"workflow-scheduler.type\": \"embedded\"}"
+
+session_handle=$(open_session "$session_options")
+echo "[INFO] Session Handle $session_handle"
+
+mkdir -p $MATERIALIZED_TABLE_DATA_DIR/catalog
+configure_session "$session_handle" "create catalog if not exists test_catalog 
with (\
+                                                'type' = 'test-filesystem',\
+                                                'default-database' = 
'test_db',\
+                                                'path' = 
'$MATERIALIZED_TABLE_DATA_DIR/'\
+                                    )"
+# prepare default db & default catalog
+configure_session $session_handle "use catalog test_catalog"
+configure_session $session_handle "create database if not exists test_db"
+create_filesystem_source $session_handle "filesystem_source"
+
+# 1. create materialized table in continuous mode
+create_materialized_table_in_continous_mode $session_handle 
"my_materialized_table_in_continuous_mode"
+
+configure_session $session_handle "set 'execution.checkpointing.savepoint-dir' 
= '$MATERIALIZED_TABLE_DATA_DIR/savepoint'"
+kubectl wait --for=condition=Available --timeout=30s 
deploy/${APPLICATION_CLUSTER_ID} || exit 1

Review Comment:
   echo "[INFO] Wait first savepoint finishes"



##########
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh:
##########
@@ -0,0 +1,242 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source "$(dirname "$0")"/common_kubernetes.sh
+
+CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
+CLUSTER_ROLE_BINDING="flink-role-binding-default"
+APPLICATION_CLUSTER_ID="flink-native-k8s-sql-mt-application-1"
+SESSION_CLUSTER_ID="flink-native-k8s-sql-mt-session-1"
+FLINK_IMAGE_NAME="test_kubernetes_materialized_table-1"
+LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"
+IMAGE_BUILD_RETRIES=3
+IMAGE_BUILD_BACKOFF=2
+
+# copy test-filesystem jar & hadoop plugin
+TEST_FILE_SYSTEM_JAR=`ls 
${END_TO_END_DIR}/../flink-test-utils-parent/flink-table-filesystem-test-utils/target/flink-table-filesystem-test-utils-*.jar`
+cp $TEST_FILE_SYSTEM_JAR ${FLINK_DIR}/lib/
+add_optional_plugin "s3-fs-hadoop"
+
+# start kubernetes
+start_kubernetes
+kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit 
--serviceaccount=default:default --namespace=default
+
+# build image
+if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image 
${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then
+       echo "ERROR: Could not build image. Aborting..."
+       exit 1
+fi
+
+# setup materialized table data dir
+echo "[INFO] Start S3 env"
+source "$(dirname "$0")"/common_s3_minio.sh
+s3_setup hadoop
+S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/"
+MATERIALIZED_TABLE_DATA_DIR="${S3_TEST_DATA_WORDS_URI}"
+
+
+echo "[INFO] Start SQL Gateway"
+set_config_key "sql-gateway.endpoint.rest.address" "localhost"
+start_sql_gateway
+
+SQL_GATEWAY_REST_PORT=8083
+
+function internal_cleanup {
+    kubectl delete deployment ${APPLICATION_CLUSTER_ID}
+    kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
+}
+
+function open_session() {
+  local session_options=$1
+
+  if [ -z "$session_options" ]; then
+    session_options="{}"
+  fi
+
+  curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"properties\": $session_options}" \
+    "http://localhost:$SQL_GATEWAY_REST_PORT/sessions"; | jq -r '.sessionHandle'
+}
+
+function configure_session() {
+  local session_handle=$1
+  local statement=$2
+
+  response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/configure-session)
+
+  if [ "$response" != "{}" ]; then
+    echo "Configure session $session_handle $statement failed: $response"
+    exit 1
+  fi
+  echo "Configured session $session_handle $statement successfully"
+}
+
+function close_session() {
+  local session_handle=$1
+  curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"session_handle\": \"$session_handle\"}" \
+    http://localhost:$SQL_GATEWAY_REST_PORT/sessions/close
+}
+
+function execute_statement() {
+  local session_handle=$1
+  local statement=$2
+
+  local response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/v3/sessions/$session_handle/statements)
+
+  local operation_handle=$(echo $response | jq -r '.operationHandle')
+  if [ -z "$operation_handle" ] || [ "$operation_handle" == "null" ]; then
+    echo "Failed to execute statement: $statement, response: $response"
+    exit 1
+  fi
+  get_operation_result $session_handle $operation_handle
+}
+
+function get_operation_result() {
+  local session_handle=$1
+  local operation_handle=$2
+
+  local fields_array=()
+  local 
next_uri="v3/sessions/$session_handle/operations/$operation_handle/result/0"
+  while [ ! -z "$next_uri" ] && [ "$next_uri" != "null" ];
+  do
+    response=$(curl -s -X GET \
+      -H "Content-Type:  \
+       application/json" \
+      http://localhost:$SQL_GATEWAY_REST_PORT/$next_uri)
+    result_type=$(echo $response | jq -r '.resultType')
+    result_kind=$(echo $response | jq -r '.resultKind')
+    next_uri=$(echo $response | jq -r '.nextResultUri')
+    errors=$(echo $response | jq -r '.errors')
+    if [ "$errors" != "null" ]; then
+      echo "fetch operation $operation_handle failed: $errors"
+      exit 1
+    fi
+    if [ result_kind == "SUCCESS" ]; then
+      fields_array+="SUCCESS"
+      break;
+    fi
+    if [ "$result_type" != "NOT_READY" ] && [ "$result_kind" == 
"SUCCESS_WITH_CONTENT" ]; then
+      new_fields=$(echo $response | jq -r '.results.data[].fields')
+      fields_array+=$new_fields
+    else
+      sleep 1
+    fi
+  done
+  echo $fields_array
+}
+
+function create_materialized_table_in_continous_mode() {
+  local session_handle=$1
+  local table_name=$2
+  local operation_handle=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"CREATE MATERIALIZED TABLE $table_name \
+        PARTITIONED BY (ds) \
+        with (\
+          'format' = 'json',\
+          'sink.rolling-policy.rollover-interval' = '10s',\
+          'sink.rolling-policy.check-interval' = '10s'\
+        )\
+        FRESHNESS = INTERVAL '10' SECOND \
+        AS SELECT \
+          DATE_FORMAT(\`timestamp\`, 'yyyy-MM-dd') AS ds, \
+          \`user\`, \
+          \`type\` \
+        FROM filesystem_source \/*+ options('source.monitor-interval' = '10s') 
*\/ \"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements | 
jq -r '.operationHandle')
+  get_operation_result $session_handle $operation_handle
+}
+
+function create_filesystem_source() {
+  local session_handle=$1
+  local table_name=$2
+  local path=$3
+
+  create_source_result=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"CREATE TABLE $table_name (\
+      \`timestamp\` TIMESTAMP_LTZ(3),\
+      \`user\` STRING,\
+      \`type\` STRING\
+    ) WITH (\
+      'format' = 'csv'\
+    )\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements/)
+
+    echo $create_source_result
+}
+
+S3_ENDPOINT=${S3_ENDPOINT//localhost/$(get_host_machine_address)}
+echo "[INFO] Create Materialized Table in Application Mode"
+session_options="{\"table.catalog-store.kind\": \"file\",
+                 \"execution.target\": \"kubernetes-application\",
+                 \"kubernetes.cluster-id\": \"${APPLICATION_CLUSTER_ID}\",
+                 \"kubernetes.container.image.ref\": \"${FLINK_IMAGE_NAME}\",
+                 \"table.catalog-store.file.path\": 
\"$MATERIALIZED_TABLE_DATA_DIR/\",
+                 \"kubernetes.rest-service.exposed.type\": \"NodePort\",
+                 \"s3.endpoint\": \"$S3_ENDPOINT\",
+                 \"workflow-scheduler.type\": \"embedded\"}"
+
+session_handle=$(open_session "$session_options")
+echo "[INFO] Session Handle $session_handle"
+
+mkdir -p $MATERIALIZED_TABLE_DATA_DIR/catalog
+configure_session "$session_handle" "create catalog if not exists test_catalog 
with (\
+                                                'type' = 'test-filesystem',\
+                                                'default-database' = 
'test_db',\
+                                                'path' = 
'$MATERIALIZED_TABLE_DATA_DIR/'\
+                                    )"
+# prepare default db & default catalog
+configure_session $session_handle "use catalog test_catalog"
+configure_session $session_handle "create database if not exists test_db"
+create_filesystem_source $session_handle "filesystem_source"
+
+# 1. create materialized table in continuous mode
+create_materialized_table_in_continous_mode $session_handle 
"my_materialized_table_in_continuous_mode"
+
+configure_session $session_handle "set 'execution.checkpointing.savepoint-dir' 
= '$MATERIALIZED_TABLE_DATA_DIR/savepoint'"
+kubectl wait --for=condition=Available --timeout=30s 
deploy/${APPLICATION_CLUSTER_ID} || exit 1
+jm_pod_name=$(kubectl get pods 
--selector="app=${APPLICATION_CLUSTER_ID},component=jobmanager" -o 
jsonpath='{..metadata.name}')
+wait_num_checkpoints $jm_pod_name 1
+
+# 2. suspend & resume materialized table in continuous mode
+execute_statement $session_handle "alter materialized table 
my_materialized_table_in_continuous_mode suspend"
+
+kubectl delete deployment $APPLICATION_CLUSTER_ID

Review Comment:
   Why do we need to manually delete clusters?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/application/ScriptRunner.java:
##########
@@ -61,6 +61,7 @@ public static void run(String script, OutputStream 
outputStream) throws Exceptio
                                         
SqlGatewayRestAPIVersion.getDefaultVersion())
                                 .build(),
                         Executors.newDirectExecutorService());
+        sessionContext.open();

Review Comment:
   This ScriptRunner is called by SqlDriver in JM,  we only execute the insert 
into statement in JM instead of materialized table related statements, so we 
don't need to call this open method.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeSessionJob(script, executionConfig, 
operationExecutor, operationHandle);
+        }
+    }
+
+    private boolean isApplicationMode(Configuration configuration) {
+        String target = configuration.get(TARGET);
+        return target.endsWith("application");
+    }
+
+    private JobExecutionResult executeSessionJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+
+        // submit flink streaming job
+        ResultFetcher resultFetcher =
+                operationExecutor.executeStatement(operationHandle, 
mergedConfig, script);
+
+        // get execution.target and jobId, clusterId
+        List<RowData> results = fetchAllResults(resultFetcher);
+        String jobId = results.get(0).getString(0).toString();
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+        Optional<String> clusterId = operationExecutor.getSessionClusterId();
+
+        return new JobExecutionResult(jobId, executeTarget, 
clusterId.orElse(null));
+    }
+
+    private JobExecutionResult executeApplicationJob(
+            String script, Configuration executionConfig, OperationExecutor 
operationExecutor) {
+        List<String> arguments = new ArrayList<>();
+        arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
+        arguments.add(script);
+
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+        JobID jobId = new JobID();
+        mergedConfig.set(PIPELINE_FIXED_JOB_ID, jobId.toString());
+
+        ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        arguments.toArray(new String[0]), 
SqlDriver.class.getName());
+        try {
+            String clusterId =
+                    new ApplicationClusterDeployer(new 
DefaultClusterClientServiceLoader())
+                            .run(mergedConfig, applicationConfiguration)
+                            .toString();
+
+            return new JobExecutionResult(jobId.toString(), 
mergedConfig.get(TARGET), clusterId);
+        } catch (Throwable t) {
+            LOG.error("Failed to deploy script to application cluster.", t);
+            throw new SqlGatewayException("Failed to deploy script to 
cluster.", t);
+        }
+    }
+
+    private static class JobExecutionResult {
+        private final String jobId;
+
+        private final String executionTarget;
+
+        private final @Nullable String clusterId;
+
+        private final @Nullable String clusterIdKeyName;
+
+        public JobExecutionResult(
+                String jobId, String executionTarget, @Nullable String 
clusterId) {
+            this.jobId = jobId;
+            this.executionTarget = executionTarget;
+            this.clusterId = clusterId;
+            this.clusterIdKeyName = convertClusterIdKeyName(executionTarget);
+        }
+    }
+
+    private static @Nullable String convertClusterIdKeyName(String targetName) 
{

Review Comment:
   ```suggestion
       private static @Nullable String getClusterIdKeyName(String 
excutionTarget) {
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(

Review Comment:
   static and name it `executeRefreshJob`. BTW, I think all of the utility 
methods we can add the static modifier to make the code more organized



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java:
##########
@@ -31,22 +31,37 @@ public class ContinuousRefreshHandler implements 
RefreshHandler, Serializable {
 
     private static final long serialVersionUID = 1L;
 
-    // TODO: add clusterId for yarn and k8s resource manager
     private final String executionTarget;
     private final String jobId;
 
+    private @Nullable final String clusterId;
+
     private @Nullable final String restorePath;
 
     public ContinuousRefreshHandler(String executionTarget, String jobId) {
         this.executionTarget = executionTarget;
         this.jobId = jobId;
+        this.clusterId = null;
+        this.restorePath = null;
+    }
+
+    public ContinuousRefreshHandler(

Review Comment:
   Make the code organized:
   ```
       public ContinuousRefreshHandler(
               String executionTarget, String clusterId, String jobId) {
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1123,31 +1129,59 @@ private static JobStatus getJobStatus(
     }
 
     private static void cancelJob(
-            OperationExecutor operationExecutor, OperationHandle handle, 
String jobId) {
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            ContinuousRefreshHandler refreshHandler) {
         operationExecutor.callStopJobOperation(
-                operationExecutor.getTableEnvironment(),
+                getTableEnvironment(operationExecutor, refreshHandler),
                 handle,
-                new StopJobOperation(jobId, false, false));
+                new StopJobOperation(refreshHandler.getJobId(), false, false));
     }
 
     private static String stopJobWithSavepoint(
-            OperationExecutor executor, OperationHandle handle, String jobId) {
+            OperationExecutor executor,
+            OperationHandle handle,
+            ContinuousRefreshHandler refreshHandler) {
         // check savepoint dir is configured
         Optional<String> savepointDir =
                 
executor.getSessionContext().getSessionConf().getOptional(SAVEPOINT_DIRECTORY);
-        if (!savepointDir.isPresent()) {
+        if (savepointDir.isEmpty()) {
             throw new ValidationException(
                     "Savepoint directory is not configured, can't stop job 
with savepoint.");
         }
+        String jobId = refreshHandler.getJobId();
         ResultFetcher resultFetcher =
                 executor.callStopJobOperation(
-                        executor.getTableEnvironment(),
+                        getTableEnvironment(executor, refreshHandler),
                         handle,
                         new StopJobOperation(jobId, true, false));
         List<RowData> results = fetchAllResults(resultFetcher);
         return results.get(0).getString(0).toString();
     }
 
+    /**
+     * Using the target and session cluster id configuration get the 
TableEnvironment.
+     *
+     * @param executor OperationExecutor

Review Comment:
   This docs is meaningless.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {

Review Comment:
   We should also consider remote mode.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeSessionJob(script, executionConfig, 
operationExecutor, operationHandle);
+        }
+    }
+
+    private boolean isApplicationMode(Configuration configuration) {
+        String target = configuration.get(TARGET);
+        return target.endsWith("application");
+    }
+
+    private JobExecutionResult executeSessionJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());

Review Comment:
   We don't need to merge the sessionConf with executionConfig here, it will be 
merged when calling the method `operationExecutor.executeStatement`.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -1244,4 +1278,99 @@ private static List<RowData> 
fetchAllResults(ResultFetcher resultFetcher) {
         }
         return results;
     }
+
+    private JobExecutionResult executeJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        if 
(isApplicationMode(operationExecutor.getSessionContext().getSessionConf())) {
+            return executeApplicationJob(script, executionConfig, 
operationExecutor);
+        } else {
+            return executeSessionJob(script, executionConfig, 
operationExecutor, operationHandle);
+        }
+    }
+
+    private boolean isApplicationMode(Configuration configuration) {
+        String target = configuration.get(TARGET);
+        return target.endsWith("application");
+    }
+
+    private JobExecutionResult executeSessionJob(
+            String script,
+            Configuration executionConfig,
+            OperationExecutor operationExecutor,
+            OperationHandle operationHandle) {
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+
+        // submit flink streaming job
+        ResultFetcher resultFetcher =
+                operationExecutor.executeStatement(operationHandle, 
mergedConfig, script);
+
+        // get execution.target and jobId, clusterId
+        List<RowData> results = fetchAllResults(resultFetcher);
+        String jobId = results.get(0).getString(0).toString();
+        String executeTarget = 
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+        Optional<String> clusterId = operationExecutor.getSessionClusterId();
+
+        return new JobExecutionResult(jobId, executeTarget, 
clusterId.orElse(null));
+    }
+
+    private JobExecutionResult executeApplicationJob(
+            String script, Configuration executionConfig, OperationExecutor 
operationExecutor) {
+        List<String> arguments = new ArrayList<>();
+        arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt());
+        arguments.add(script);
+
+        Configuration mergedConfig =
+                new 
Configuration(operationExecutor.getSessionContext().getSessionConf());
+        mergedConfig.addAll(executionConfig);
+        JobID jobId = new JobID();
+        mergedConfig.set(PIPELINE_FIXED_JOB_ID, jobId.toString());
+
+        ApplicationConfiguration applicationConfiguration =
+                new ApplicationConfiguration(
+                        arguments.toArray(new String[0]), 
SqlDriver.class.getName());
+        try {
+            String clusterId =
+                    new ApplicationClusterDeployer(new 
DefaultClusterClientServiceLoader())
+                            .run(mergedConfig, applicationConfiguration)
+                            .toString();
+
+            return new JobExecutionResult(jobId.toString(), 
mergedConfig.get(TARGET), clusterId);
+        } catch (Throwable t) {
+            LOG.error("Failed to deploy script to application cluster.", t);

Review Comment:
   It would be better if we can also log the SQL script.



##########
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh:
##########
@@ -0,0 +1,242 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source "$(dirname "$0")"/common_kubernetes.sh
+
+CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
+CLUSTER_ROLE_BINDING="flink-role-binding-default"
+APPLICATION_CLUSTER_ID="flink-native-k8s-sql-mt-application-1"
+SESSION_CLUSTER_ID="flink-native-k8s-sql-mt-session-1"

Review Comment:
   You just test application mode here, so we can delete it?



##########
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh:
##########
@@ -0,0 +1,242 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source "$(dirname "$0")"/common_kubernetes.sh
+
+CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
+CLUSTER_ROLE_BINDING="flink-role-binding-default"
+APPLICATION_CLUSTER_ID="flink-native-k8s-sql-mt-application-1"
+SESSION_CLUSTER_ID="flink-native-k8s-sql-mt-session-1"
+FLINK_IMAGE_NAME="test_kubernetes_materialized_table-1"
+LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"
+IMAGE_BUILD_RETRIES=3
+IMAGE_BUILD_BACKOFF=2
+
+# copy test-filesystem jar & hadoop plugin
+TEST_FILE_SYSTEM_JAR=`ls 
${END_TO_END_DIR}/../flink-test-utils-parent/flink-table-filesystem-test-utils/target/flink-table-filesystem-test-utils-*.jar`
+cp $TEST_FILE_SYSTEM_JAR ${FLINK_DIR}/lib/
+add_optional_plugin "s3-fs-hadoop"
+
+# start kubernetes
+start_kubernetes
+kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit 
--serviceaccount=default:default --namespace=default
+
+# build image
+if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image 
${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then
+       echo "ERROR: Could not build image. Aborting..."
+       exit 1
+fi
+
+# setup materialized table data dir
+echo "[INFO] Start S3 env"
+source "$(dirname "$0")"/common_s3_minio.sh
+s3_setup hadoop
+S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/"
+MATERIALIZED_TABLE_DATA_DIR="${S3_TEST_DATA_WORDS_URI}"
+
+
+echo "[INFO] Start SQL Gateway"
+set_config_key "sql-gateway.endpoint.rest.address" "localhost"
+start_sql_gateway
+
+SQL_GATEWAY_REST_PORT=8083
+
+function internal_cleanup {

Review Comment:
   This function no one use?



##########
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh:
##########
@@ -0,0 +1,242 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source "$(dirname "$0")"/common_kubernetes.sh
+
+CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
+CLUSTER_ROLE_BINDING="flink-role-binding-default"
+APPLICATION_CLUSTER_ID="flink-native-k8s-sql-mt-application-1"
+SESSION_CLUSTER_ID="flink-native-k8s-sql-mt-session-1"
+FLINK_IMAGE_NAME="test_kubernetes_materialized_table-1"
+LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"
+IMAGE_BUILD_RETRIES=3
+IMAGE_BUILD_BACKOFF=2
+
+# copy test-filesystem jar & hadoop plugin
+TEST_FILE_SYSTEM_JAR=`ls 
${END_TO_END_DIR}/../flink-test-utils-parent/flink-table-filesystem-test-utils/target/flink-table-filesystem-test-utils-*.jar`
+cp $TEST_FILE_SYSTEM_JAR ${FLINK_DIR}/lib/
+add_optional_plugin "s3-fs-hadoop"
+
+# start kubernetes
+start_kubernetes
+kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit 
--serviceaccount=default:default --namespace=default
+
+# build image
+if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image 
${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then
+       echo "ERROR: Could not build image. Aborting..."
+       exit 1
+fi
+
+# setup materialized table data dir
+echo "[INFO] Start S3 env"
+source "$(dirname "$0")"/common_s3_minio.sh
+s3_setup hadoop
+S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/"
+MATERIALIZED_TABLE_DATA_DIR="${S3_TEST_DATA_WORDS_URI}"
+
+
+echo "[INFO] Start SQL Gateway"
+set_config_key "sql-gateway.endpoint.rest.address" "localhost"
+start_sql_gateway
+
+SQL_GATEWAY_REST_PORT=8083
+
+function internal_cleanup {
+    kubectl delete deployment ${APPLICATION_CLUSTER_ID}
+    kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
+}
+
+function open_session() {
+  local session_options=$1
+
+  if [ -z "$session_options" ]; then
+    session_options="{}"
+  fi
+
+  curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"properties\": $session_options}" \
+    "http://localhost:$SQL_GATEWAY_REST_PORT/sessions"; | jq -r '.sessionHandle'
+}
+
+function configure_session() {
+  local session_handle=$1
+  local statement=$2
+
+  response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/configure-session)
+
+  if [ "$response" != "{}" ]; then
+    echo "Configure session $session_handle $statement failed: $response"
+    exit 1
+  fi
+  echo "Configured session $session_handle $statement successfully"
+}
+
+function close_session() {

Review Comment:
   Also no one use it?



##########
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh:
##########
@@ -0,0 +1,242 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source "$(dirname "$0")"/common_kubernetes.sh
+
+CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
+CLUSTER_ROLE_BINDING="flink-role-binding-default"
+APPLICATION_CLUSTER_ID="flink-native-k8s-sql-mt-application-1"
+SESSION_CLUSTER_ID="flink-native-k8s-sql-mt-session-1"
+FLINK_IMAGE_NAME="test_kubernetes_materialized_table-1"
+LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"
+IMAGE_BUILD_RETRIES=3
+IMAGE_BUILD_BACKOFF=2
+
+# copy test-filesystem jar & hadoop plugin
+TEST_FILE_SYSTEM_JAR=`ls 
${END_TO_END_DIR}/../flink-test-utils-parent/flink-table-filesystem-test-utils/target/flink-table-filesystem-test-utils-*.jar`
+cp $TEST_FILE_SYSTEM_JAR ${FLINK_DIR}/lib/
+add_optional_plugin "s3-fs-hadoop"
+
+# start kubernetes
+start_kubernetes
+kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit 
--serviceaccount=default:default --namespace=default
+
+# build image
+if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image 
${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then
+       echo "ERROR: Could not build image. Aborting..."
+       exit 1
+fi
+
+# setup materialized table data dir
+echo "[INFO] Start S3 env"
+source "$(dirname "$0")"/common_s3_minio.sh
+s3_setup hadoop
+S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/"
+MATERIALIZED_TABLE_DATA_DIR="${S3_TEST_DATA_WORDS_URI}"
+
+
+echo "[INFO] Start SQL Gateway"
+set_config_key "sql-gateway.endpoint.rest.address" "localhost"
+start_sql_gateway
+
+SQL_GATEWAY_REST_PORT=8083
+
+function internal_cleanup {
+    kubectl delete deployment ${APPLICATION_CLUSTER_ID}
+    kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
+}
+
+function open_session() {
+  local session_options=$1
+
+  if [ -z "$session_options" ]; then
+    session_options="{}"
+  fi
+
+  curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"properties\": $session_options}" \
+    "http://localhost:$SQL_GATEWAY_REST_PORT/sessions"; | jq -r '.sessionHandle'
+}
+
+function configure_session() {
+  local session_handle=$1
+  local statement=$2
+
+  response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/configure-session)
+
+  if [ "$response" != "{}" ]; then
+    echo "Configure session $session_handle $statement failed: $response"
+    exit 1
+  fi
+  echo "Configured session $session_handle $statement successfully"
+}
+
+function close_session() {
+  local session_handle=$1
+  curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"session_handle\": \"$session_handle\"}" \
+    http://localhost:$SQL_GATEWAY_REST_PORT/sessions/close
+}
+
+function execute_statement() {
+  local session_handle=$1
+  local statement=$2
+
+  local response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/v3/sessions/$session_handle/statements)
+
+  local operation_handle=$(echo $response | jq -r '.operationHandle')
+  if [ -z "$operation_handle" ] || [ "$operation_handle" == "null" ]; then
+    echo "Failed to execute statement: $statement, response: $response"
+    exit 1
+  fi
+  get_operation_result $session_handle $operation_handle
+}
+
+function get_operation_result() {
+  local session_handle=$1
+  local operation_handle=$2
+
+  local fields_array=()
+  local 
next_uri="v3/sessions/$session_handle/operations/$operation_handle/result/0"
+  while [ ! -z "$next_uri" ] && [ "$next_uri" != "null" ];
+  do
+    response=$(curl -s -X GET \
+      -H "Content-Type:  \
+       application/json" \
+      http://localhost:$SQL_GATEWAY_REST_PORT/$next_uri)
+    result_type=$(echo $response | jq -r '.resultType')
+    result_kind=$(echo $response | jq -r '.resultKind')
+    next_uri=$(echo $response | jq -r '.nextResultUri')
+    errors=$(echo $response | jq -r '.errors')
+    if [ "$errors" != "null" ]; then
+      echo "fetch operation $operation_handle failed: $errors"
+      exit 1
+    fi
+    if [ result_kind == "SUCCESS" ]; then
+      fields_array+="SUCCESS"
+      break;
+    fi
+    if [ "$result_type" != "NOT_READY" ] && [ "$result_kind" == 
"SUCCESS_WITH_CONTENT" ]; then
+      new_fields=$(echo $response | jq -r '.results.data[].fields')
+      fields_array+=$new_fields
+    else
+      sleep 1
+    fi
+  done
+  echo $fields_array
+}
+
+function create_materialized_table_in_continous_mode() {
+  local session_handle=$1
+  local table_name=$2
+  local operation_handle=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"CREATE MATERIALIZED TABLE $table_name \
+        PARTITIONED BY (ds) \
+        with (\
+          'format' = 'json',\
+          'sink.rolling-policy.rollover-interval' = '10s',\
+          'sink.rolling-policy.check-interval' = '10s'\
+        )\
+        FRESHNESS = INTERVAL '10' SECOND \
+        AS SELECT \
+          DATE_FORMAT(\`timestamp\`, 'yyyy-MM-dd') AS ds, \
+          \`user\`, \
+          \`type\` \
+        FROM filesystem_source \/*+ options('source.monitor-interval' = '10s') 
*\/ \"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements | 
jq -r '.operationHandle')
+  get_operation_result $session_handle $operation_handle
+}
+
+function create_filesystem_source() {
+  local session_handle=$1
+  local table_name=$2
+  local path=$3
+
+  create_source_result=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"CREATE TABLE $table_name (\
+      \`timestamp\` TIMESTAMP_LTZ(3),\
+      \`user\` STRING,\
+      \`type\` STRING\
+    ) WITH (\
+      'format' = 'csv'\
+    )\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements/)
+
+    echo $create_source_result
+}
+
+S3_ENDPOINT=${S3_ENDPOINT//localhost/$(get_host_machine_address)}
+echo "[INFO] Create Materialized Table in Application Mode"
+session_options="{\"table.catalog-store.kind\": \"file\",
+                 \"execution.target\": \"kubernetes-application\",
+                 \"kubernetes.cluster-id\": \"${APPLICATION_CLUSTER_ID}\",
+                 \"kubernetes.container.image.ref\": \"${FLINK_IMAGE_NAME}\",
+                 \"table.catalog-store.file.path\": 
\"$MATERIALIZED_TABLE_DATA_DIR/\",
+                 \"kubernetes.rest-service.exposed.type\": \"NodePort\",
+                 \"s3.endpoint\": \"$S3_ENDPOINT\",
+                 \"workflow-scheduler.type\": \"embedded\"}"
+
+session_handle=$(open_session "$session_options")
+echo "[INFO] Session Handle $session_handle"
+
+mkdir -p $MATERIALIZED_TABLE_DATA_DIR/catalog
+configure_session "$session_handle" "create catalog if not exists test_catalog 
with (\
+                                                'type' = 'test-filesystem',\
+                                                'default-database' = 
'test_db',\
+                                                'path' = 
'$MATERIALIZED_TABLE_DATA_DIR/'\
+                                    )"
+# prepare default db & default catalog
+configure_session $session_handle "use catalog test_catalog"
+configure_session $session_handle "create database if not exists test_db"
+create_filesystem_source $session_handle "filesystem_source"
+
+# 1. create materialized table in continuous mode
+create_materialized_table_in_continous_mode $session_handle 
"my_materialized_table_in_continuous_mode"
+
+configure_session $session_handle "set 'execution.checkpointing.savepoint-dir' 
= '$MATERIALIZED_TABLE_DATA_DIR/savepoint'"
+kubectl wait --for=condition=Available --timeout=30s 
deploy/${APPLICATION_CLUSTER_ID} || exit 1
+jm_pod_name=$(kubectl get pods 
--selector="app=${APPLICATION_CLUSTER_ID},component=jobmanager" -o 
jsonpath='{..metadata.name}')
+wait_num_checkpoints $jm_pod_name 1
+
+# 2. suspend & resume materialized table in continuous mode
+execute_statement $session_handle "alter materialized table 
my_materialized_table_in_continuous_mode suspend"

Review Comment:
   echo "[INFO] Suspend materialized table"



##########
flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh:
##########
@@ -0,0 +1,242 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+source "$(dirname "$0")"/common_kubernetes.sh
+
+CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P`
+CLUSTER_ROLE_BINDING="flink-role-binding-default"
+APPLICATION_CLUSTER_ID="flink-native-k8s-sql-mt-application-1"
+SESSION_CLUSTER_ID="flink-native-k8s-sql-mt-session-1"
+FLINK_IMAGE_NAME="test_kubernetes_materialized_table-1"
+LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"
+IMAGE_BUILD_RETRIES=3
+IMAGE_BUILD_BACKOFF=2
+
+# copy test-filesystem jar & hadoop plugin
+TEST_FILE_SYSTEM_JAR=`ls 
${END_TO_END_DIR}/../flink-test-utils-parent/flink-table-filesystem-test-utils/target/flink-table-filesystem-test-utils-*.jar`
+cp $TEST_FILE_SYSTEM_JAR ${FLINK_DIR}/lib/
+add_optional_plugin "s3-fs-hadoop"
+
+# start kubernetes
+start_kubernetes
+kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit 
--serviceaccount=default:default --namespace=default
+
+# build image
+if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image 
${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then
+       echo "ERROR: Could not build image. Aborting..."
+       exit 1
+fi
+
+# setup materialized table data dir
+echo "[INFO] Start S3 env"
+source "$(dirname "$0")"/common_s3_minio.sh
+s3_setup hadoop
+S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/"
+MATERIALIZED_TABLE_DATA_DIR="${S3_TEST_DATA_WORDS_URI}"
+
+
+echo "[INFO] Start SQL Gateway"
+set_config_key "sql-gateway.endpoint.rest.address" "localhost"
+start_sql_gateway
+
+SQL_GATEWAY_REST_PORT=8083
+
+function internal_cleanup {
+    kubectl delete deployment ${APPLICATION_CLUSTER_ID}
+    kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
+}
+
+function open_session() {
+  local session_options=$1
+
+  if [ -z "$session_options" ]; then
+    session_options="{}"
+  fi
+
+  curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"properties\": $session_options}" \
+    "http://localhost:$SQL_GATEWAY_REST_PORT/sessions"; | jq -r '.sessionHandle'
+}
+
+function configure_session() {
+  local session_handle=$1
+  local statement=$2
+
+  response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/configure-session)
+
+  if [ "$response" != "{}" ]; then
+    echo "Configure session $session_handle $statement failed: $response"
+    exit 1
+  fi
+  echo "Configured session $session_handle $statement successfully"
+}
+
+function close_session() {
+  local session_handle=$1
+  curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"session_handle\": \"$session_handle\"}" \
+    http://localhost:$SQL_GATEWAY_REST_PORT/sessions/close
+}
+
+function execute_statement() {
+  local session_handle=$1
+  local statement=$2
+
+  local response=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"$statement\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/v3/sessions/$session_handle/statements)
+
+  local operation_handle=$(echo $response | jq -r '.operationHandle')
+  if [ -z "$operation_handle" ] || [ "$operation_handle" == "null" ]; then
+    echo "Failed to execute statement: $statement, response: $response"
+    exit 1
+  fi
+  get_operation_result $session_handle $operation_handle
+}
+
+function get_operation_result() {
+  local session_handle=$1
+  local operation_handle=$2
+
+  local fields_array=()
+  local 
next_uri="v3/sessions/$session_handle/operations/$operation_handle/result/0"
+  while [ ! -z "$next_uri" ] && [ "$next_uri" != "null" ];
+  do
+    response=$(curl -s -X GET \
+      -H "Content-Type:  \
+       application/json" \
+      http://localhost:$SQL_GATEWAY_REST_PORT/$next_uri)
+    result_type=$(echo $response | jq -r '.resultType')
+    result_kind=$(echo $response | jq -r '.resultKind')
+    next_uri=$(echo $response | jq -r '.nextResultUri')
+    errors=$(echo $response | jq -r '.errors')
+    if [ "$errors" != "null" ]; then
+      echo "fetch operation $operation_handle failed: $errors"
+      exit 1
+    fi
+    if [ result_kind == "SUCCESS" ]; then
+      fields_array+="SUCCESS"
+      break;
+    fi
+    if [ "$result_type" != "NOT_READY" ] && [ "$result_kind" == 
"SUCCESS_WITH_CONTENT" ]; then
+      new_fields=$(echo $response | jq -r '.results.data[].fields')
+      fields_array+=$new_fields
+    else
+      sleep 1
+    fi
+  done
+  echo $fields_array
+}
+
+function create_materialized_table_in_continous_mode() {
+  local session_handle=$1
+  local table_name=$2
+  local operation_handle=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"CREATE MATERIALIZED TABLE $table_name \
+        PARTITIONED BY (ds) \
+        with (\
+          'format' = 'json',\
+          'sink.rolling-policy.rollover-interval' = '10s',\
+          'sink.rolling-policy.check-interval' = '10s'\
+        )\
+        FRESHNESS = INTERVAL '10' SECOND \
+        AS SELECT \
+          DATE_FORMAT(\`timestamp\`, 'yyyy-MM-dd') AS ds, \
+          \`user\`, \
+          \`type\` \
+        FROM filesystem_source \/*+ options('source.monitor-interval' = '10s') 
*\/ \"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements | 
jq -r '.operationHandle')
+  get_operation_result $session_handle $operation_handle
+}
+
+function create_filesystem_source() {
+  local session_handle=$1
+  local table_name=$2
+  local path=$3
+
+  create_source_result=$(curl -s -X POST \
+    -H "Content-Type: application/json" \
+    -d "{\"statement\": \"CREATE TABLE $table_name (\
+      \`timestamp\` TIMESTAMP_LTZ(3),\
+      \`user\` STRING,\
+      \`type\` STRING\
+    ) WITH (\
+      'format' = 'csv'\
+    )\"}" \
+    
http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements/)
+
+    echo $create_source_result
+}
+
+S3_ENDPOINT=${S3_ENDPOINT//localhost/$(get_host_machine_address)}
+echo "[INFO] Create Materialized Table in Application Mode"
+session_options="{\"table.catalog-store.kind\": \"file\",
+                 \"execution.target\": \"kubernetes-application\",
+                 \"kubernetes.cluster-id\": \"${APPLICATION_CLUSTER_ID}\",
+                 \"kubernetes.container.image.ref\": \"${FLINK_IMAGE_NAME}\",
+                 \"table.catalog-store.file.path\": 
\"$MATERIALIZED_TABLE_DATA_DIR/\",
+                 \"kubernetes.rest-service.exposed.type\": \"NodePort\",
+                 \"s3.endpoint\": \"$S3_ENDPOINT\",
+                 \"workflow-scheduler.type\": \"embedded\"}"
+
+session_handle=$(open_session "$session_options")
+echo "[INFO] Session Handle $session_handle"
+
+mkdir -p $MATERIALIZED_TABLE_DATA_DIR/catalog
+configure_session "$session_handle" "create catalog if not exists test_catalog 
with (\
+                                                'type' = 'test-filesystem',\
+                                                'default-database' = 
'test_db',\
+                                                'path' = 
'$MATERIALIZED_TABLE_DATA_DIR/'\
+                                    )"
+# prepare default db & default catalog
+configure_session $session_handle "use catalog test_catalog"
+configure_session $session_handle "create database if not exists test_db"
+create_filesystem_source $session_handle "filesystem_source"
+
+# 1. create materialized table in continuous mode
+create_materialized_table_in_continous_mode $session_handle 
"my_materialized_table_in_continuous_mode"
+
+configure_session $session_handle "set 'execution.checkpointing.savepoint-dir' 
= '$MATERIALIZED_TABLE_DATA_DIR/savepoint'"
+kubectl wait --for=condition=Available --timeout=30s 
deploy/${APPLICATION_CLUSTER_ID} || exit 1
+jm_pod_name=$(kubectl get pods 
--selector="app=${APPLICATION_CLUSTER_ID},component=jobmanager" -o 
jsonpath='{..metadata.name}')
+wait_num_checkpoints $jm_pod_name 1
+
+# 2. suspend & resume materialized table in continuous mode
+execute_statement $session_handle "alter materialized table 
my_materialized_table_in_continuous_mode suspend"
+
+kubectl delete deployment $APPLICATION_CLUSTER_ID
+kubectl wait --for=delete deployment/$APPLICATION_CLUSTER_ID
+
+execute_statement $session_handle "alter materialized table 
my_materialized_table_in_continuous_mode resume"

Review Comment:
   echo "[INFO] Resume materialized table"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to