This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4a2535cb8ab9f4ef807e98e8b92a6f921960c059 Author: Feng Jin <jinfeng1...@gmail.com> AuthorDate: Sun Jan 19 21:10:30 2025 +0800 [FLINK-37133][table] Support Submitting Refresh Job of Materialized Table to Yarn/K8s --- .../client/deployment/StandaloneClusterId.java | 5 + flink-end-to-end-tests/run-nightly-tests.sh | 1 + .../test_kubernetes_materialized_table.sh | 241 +++++++++++++++++++++ .../MaterializedTableManager.java | 195 ++++++++++++++--- .../service/operation/OperationExecutor.java | 8 + .../catalog/CatalogBaseTableResolutionTest.java | 3 +- .../table/refresh/ContinuousRefreshHandler.java | 21 +- 7 files changed, 433 insertions(+), 41 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterId.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterId.java index 6ebfc7201b99..402dc98fba40 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterId.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterId.java @@ -27,4 +27,9 @@ public class StandaloneClusterId { public static StandaloneClusterId getInstance() { return INSTANCE; } + + @Override + public String toString() { + return "StandaloneClusterId"; + } } diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 1eb7d49405d4..92304ef616a0 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -180,6 +180,7 @@ function run_group_2 { run_test "Streaming SQL end-to-end test using planner with Scala version" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh scala-planner" "skip_check_exceptions" run_test "Sql Jdbc Driver end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_jdbc_driver.sh" "skip_check_exceptions" run_test "Run kubernetes SQL application test" "$END_TO_END_DIR/test-scripts/test_kubernetes_sql_application.sh" + run_test "Run kubernetes Materialized Table test" "$END_TO_END_DIR/test-scripts/test_kubernetes_materialized_table.sh" run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local StreamingFileSink" "skip_check_exceptions" run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 StreamingFileSink" "skip_check_exceptions" diff --git a/flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh b/flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh new file mode 100755 index 000000000000..addc04f28bda --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_kubernetes_materialized_table.sh @@ -0,0 +1,241 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This script aims to test Materialized Table and Kubernetes integration. + +source "$(dirname "$0")"/common_kubernetes.sh + +CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P` +CLUSTER_ROLE_BINDING="flink-role-binding-default" +APPLICATION_CLUSTER_ID="flink-native-k8s-sql-mt-application-1" +FLINK_IMAGE_NAME="test_kubernetes_mt_application-1" +LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log" +IMAGE_BUILD_RETRIES=3 +IMAGE_BUILD_BACKOFF=2 + +# copy test-filesystem jar & hadoop plugin +TEST_FILE_SYSTEM_JAR=`ls ${END_TO_END_DIR}/../flink-test-utils-parent/flink-table-filesystem-test-utils/target/flink-table-filesystem-test-utils-*.jar` +cp $TEST_FILE_SYSTEM_JAR ${FLINK_DIR}/lib/ +add_optional_plugin "s3-fs-hadoop" + +# start kubernetes +start_kubernetes +kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit --serviceaccount=default:default --namespace=default + + # build image +if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image ${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then + echo "ERROR: Could not build image. Aborting..." + exit 1 +fi + +# setup materialized table data dir +echo "[INFO] Start S3 env" +source "$(dirname "$0")"/common_s3_minio.sh +s3_setup hadoop +S3_TEST_DATA_WORDS_URI="s3://$IT_CASE_S3_BUCKET/" +MATERIALIZED_TABLE_DATA_DIR="${S3_TEST_DATA_WORDS_URI}/test_materialized_table-$(uuidgen)" + +echo "[INFO] Start SQL Gateway" +set_config_key "sql-gateway.endpoint.rest.address" "localhost" +start_sql_gateway + +SQL_GATEWAY_REST_PORT=8083 + +function internal_cleanup { + kubectl delete deployment ${APPLICATION_CLUSTER_ID} + kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING} +} + +function open_session() { + local session_options=$1 + + if [ -z "$session_options" ]; then + session_options="{}" + fi + + curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "{\"properties\": $session_options}" \ + "http://localhost:$SQL_GATEWAY_REST_PORT/sessions" | jq -r '.sessionHandle' +} + +function configure_session() { + local session_handle=$1 + local statement=$2 + + response=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "{\"statement\": \"$statement\"}" \ + http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/configure-session) + + if [ "$response" != "{}" ]; then + echo "Configure session $session_handle $statement failed: $response" + exit 1 + fi + echo "Configured session $session_handle $statement successfully" +} + +function execute_statement() { + local session_handle=$1 + local statement=$2 + + local response=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "{\"statement\": \"$statement\"}" \ + http://localhost:$SQL_GATEWAY_REST_PORT/v3/sessions/$session_handle/statements) + + local operation_handle=$(echo $response | jq -r '.operationHandle') + if [ -z "$operation_handle" ] || [ "$operation_handle" == "null" ]; then + echo "Failed to execute statement: $statement, response: $response" + exit 1 + fi + get_operation_result $session_handle $operation_handle +} + +function get_operation_result() { + local session_handle=$1 + local operation_handle=$2 + + local fields_array=() + local next_uri="v3/sessions/$session_handle/operations/$operation_handle/result/0" + while [ ! -z "$next_uri" ] && [ "$next_uri" != "null" ]; + do + response=$(curl -s -X GET \ + -H "Content-Type: \ + application/json" \ + http://localhost:$SQL_GATEWAY_REST_PORT/$next_uri) + result_type=$(echo $response | jq -r '.resultType') + result_kind=$(echo $response | jq -r '.resultKind') + next_uri=$(echo $response | jq -r '.nextResultUri') + errors=$(echo $response | jq -r '.errors') + if [ "$errors" != "null" ]; then + echo "fetch operation $operation_handle failed: $errors" + exit 1 + fi + if [ result_kind == "SUCCESS" ]; then + fields_array+="SUCCESS" + break; + fi + if [ "$result_type" != "NOT_READY" ] && [ "$result_kind" == "SUCCESS_WITH_CONTENT" ]; then + new_fields=$(echo $response | jq -r '.results.data[].fields') + fields_array+=$new_fields + else + sleep 1 + fi + done + echo $fields_array +} + +function create_materialized_table_in_continous_mode() { + local session_handle=$1 + local table_name=$2 + local operation_handle=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "{\"statement\": \"CREATE MATERIALIZED TABLE $table_name \ + PARTITIONED BY (ds) \ + with (\ + 'format' = 'json',\ + 'sink.rolling-policy.rollover-interval' = '10s',\ + 'sink.rolling-policy.check-interval' = '10s'\ + )\ + FRESHNESS = INTERVAL '10' SECOND \ + AS SELECT \ + DATE_FORMAT(\`timestamp\`, 'yyyy-MM-dd') AS ds, \ + \`user\`, \ + \`type\` \ + FROM filesystem_source \/*+ options('source.monitor-interval' = '10s') *\/ \"}" \ + http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements | jq -r '.operationHandle') + get_operation_result $session_handle $operation_handle +} + +function create_filesystem_source() { + local session_handle=$1 + local table_name=$2 + local path=$3 + + create_source_result=$(curl -s -X POST \ + -H "Content-Type: application/json" \ + -d "{\"statement\": \"CREATE TABLE $table_name (\ + \`timestamp\` TIMESTAMP_LTZ(3),\ + \`user\` STRING,\ + \`type\` STRING\ + ) WITH (\ + 'format' = 'csv'\ + )\"}" \ + http://localhost:$SQL_GATEWAY_REST_PORT/sessions/$session_handle/statements/) + + echo $create_source_result +} + +S3_ENDPOINT=${S3_ENDPOINT//localhost/$(get_host_machine_address)} +echo "[INFO] Create a new session" +session_options="{\"table.catalog-store.kind\": \"file\", + \"table.catalog-store.file.path\": \"$MATERIALIZED_TABLE_DATA_DIR/\", + \"execution.target\": \"kubernetes-application\", + \"kubernetes.cluster-id\": \"${APPLICATION_CLUSTER_ID}\", + \"kubernetes.container.image.ref\": \"${FLINK_IMAGE_NAME}\", + \"jobmanager.memory.process.size\": \"1088m\", + \"taskmanager.memory.process.size\": \"1000m\", + \"kubernetes.jobmanager.cpu\": \"0.5\", + \"kubernetes.taskmanager.cpu\": \"0.5\", + \"kubernetes.rest-service.exposed.type\": \"NodePort\", + \"s3.endpoint\": \"$S3_ENDPOINT\", + \"workflow-scheduler.type\": \"embedded\"}" + +session_handle=$(open_session "$session_options") +echo "[INFO] Session Handle $session_handle" + +# prepare catalog & database +echo "[INFO] Create catalog & database" +configure_session "$session_handle" "create catalog if not exists test_catalog with (\ + 'type' = 'test-filesystem',\ + 'default-database' = 'test_db',\ + 'path' = '$MATERIALIZED_TABLE_DATA_DIR/'\ + )" +configure_session $session_handle "use catalog test_catalog" +configure_session $session_handle "create database if not exists test_db" +create_filesystem_source $session_handle "filesystem_source" + +# 1. create materialized table in continuous mode +echo "[INFO] Create Materialized table in continuous mode" +create_materialized_table_in_continous_mode $session_handle "my_materialized_table_in_continuous_mode" +echo "[INFO] Wait deployment ${APPLICATION_CLUSTER_ID} Available" +kubectl wait --for=condition=Available --timeout=30s deploy/${APPLICATION_CLUSTER_ID} || exit 1 +jm_pod_name=$(kubectl get pods --selector="app=${APPLICATION_CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') +echo "[INFO] Wait first checkpoint finished" +wait_num_checkpoints $jm_pod_name 1 + +# 2. suspend & resume materialized table in continuous mode +echo "[INFO] Suspend materialized table" +configure_session $session_handle "set 'execution.checkpointing.savepoint-dir' = '$MATERIALIZED_TABLE_DATA_DIR/savepoint'" +execute_statement $session_handle "alter materialized table my_materialized_table_in_continuous_mode suspend" + +kubectl wait --for=delete deployment/$APPLICATION_CLUSTER_ID + +echo "[INFO] Resume materialized table" +execute_statement $session_handle "alter materialized table my_materialized_table_in_continuous_mode resume" +kubectl wait --for=condition=Available --timeout=30s deploy/${APPLICATION_CLUSTER_ID} || exit 1 +jm_pod_name=$(kubectl get pods --selector="app=${APPLICATION_CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') +echo "[INFO] Wait resumed job finished the first checkpoint" +wait_num_checkpoints $jm_pod_name 1 + +# 3. verify resumed continuous job is restore from savepoint +mkdir -p "$LOCAL_LOGS_PATH" +kubectl logs $jm_pod_name > $LOCAL_LOGS_PATH/jobmanager.log +grep -E "Starting job [A-Za-z0-9]+ from savepoint" $LOCAL_LOGS_PATH/jobmanager.log diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index 2c2ea78f1003..14ad047ebfb0 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -20,12 +20,17 @@ package org.apache.flink.table.gateway.service.materializedtable; import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer; import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.config.TableConfigOptions; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; import org.apache.flink.table.catalog.CatalogMaterializedTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.IntervalFreshness; @@ -42,6 +47,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.factories.WorkflowSchedulerFactoryUtil; import org.apache.flink.table.gateway.api.operation.OperationHandle; import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestOptions; import org.apache.flink.table.gateway.service.operation.OperationExecutor; @@ -61,6 +67,7 @@ import org.apache.flink.table.refresh.ContinuousRefreshHandler; import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer; import org.apache.flink.table.refresh.RefreshHandler; import org.apache.flink.table.refresh.RefreshHandlerSerializer; +import org.apache.flink.table.runtime.application.SqlDriver; import org.apache.flink.table.types.logical.LogicalTypeFamily; import org.apache.flink.table.workflow.CreatePeriodicRefreshWorkflow; import org.apache.flink.table.workflow.CreateRefreshWorkflow; @@ -95,6 +102,7 @@ import static org.apache.flink.configuration.CheckpointingOptions.SAVEPOINT_DIRE import static org.apache.flink.configuration.DeploymentOptions.TARGET; import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; import static org.apache.flink.configuration.PipelineOptions.NAME; +import static org.apache.flink.configuration.PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID; import static org.apache.flink.configuration.StateRecoveryOptions.SAVEPOINT_PATH; import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.DATE_FORMATTER; import static org.apache.flink.table.api.config.MaterializedTableConfigOptions.PARTITION_FIELDS; @@ -337,12 +345,12 @@ public class MaterializedTableManager { tableIdentifier, refreshHandler.getJobId())); } - String savepointPath = - stopJobWithSavepoint(operationExecutor, handle, refreshHandler.getJobId()); + String savepointPath = stopJobWithSavepoint(operationExecutor, handle, refreshHandler); ContinuousRefreshHandler updateRefreshHandler = new ContinuousRefreshHandler( refreshHandler.getExecutionTarget(), + refreshHandler.getClusterId(), refreshHandler.getJobId(), savepointPath); @@ -565,17 +573,12 @@ public class MaterializedTableManager { materializedTableIdentifier, catalogMaterializedTable.getDefinitionQuery(), dynamicOptions); - // submit flink streaming job - ResultFetcher resultFetcher = - operationExecutor.executeStatement(handle, customConfig, insertStatement); - // get execution.target and jobId, currently doesn't support yarn and k8s, so doesn't - // get clusterId - List<RowData> results = fetchAllResults(resultFetcher); - String jobId = results.get(0).getString(0).toString(); - String executeTarget = operationExecutor.getSessionContext().getSessionConf().get(TARGET); + JobExecutionResult result = + executeRefreshJob(insertStatement, customConfig, operationExecutor, handle); ContinuousRefreshHandler continuousRefreshHandler = - new ContinuousRefreshHandler(executeTarget, jobId); + new ContinuousRefreshHandler( + result.executionTarget, result.clusterId, result.jobId); byte[] serializedBytes = serializeContinuousHandler(continuousRefreshHandler); updateRefreshHandler( @@ -659,17 +662,19 @@ public class MaterializedTableManager { "Begin to refreshing the materialized table {}, statement: {}", materializedTableIdentifier, insertStatement); - ResultFetcher resultFetcher = - operationExecutor.executeStatement(handle, customConfig, insertStatement); + JobExecutionResult result = + executeRefreshJob(insertStatement, customConfig, operationExecutor, handle); - List<RowData> results = fetchAllResults(resultFetcher); - String jobId = results.get(0).getString(0).toString(); - String executeTarget = - operationExecutor.getSessionContext().getSessionConf().get(TARGET); Map<StringData, StringData> clusterInfo = new HashMap<>(); clusterInfo.put( - StringData.fromString(TARGET.key()), StringData.fromString(executeTarget)); - // TODO get clusterId + StringData.fromString(TARGET.key()), + StringData.fromString(result.executionTarget)); + Optional<String> clusterIdKeyName = getClusterIdKeyName(result.executionTarget); + clusterIdKeyName.ifPresent( + s -> + clusterInfo.put( + StringData.fromString(s), + StringData.fromString(result.clusterId))); return ResultFetcher.fromResults( handle, @@ -680,7 +685,7 @@ public class MaterializedTableManager { DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()))), Collections.singletonList( GenericRowData.of( - StringData.fromString(jobId), + StringData.fromString(result.jobId), new GenericMapData(clusterInfo)))); } catch (Exception e) { throw new SqlExecutionException( @@ -895,7 +900,7 @@ public class MaterializedTableManager { // handler) List<MaterializedTableChange> tableChanges = new ArrayList<>(op.getTableChanges()); TableChange.ModifyRefreshHandler modifyRefreshHandler = - genereateResetSavepointTableChange( + generateResetSavepointTableChange( oldMaterializedTable.getSerializedRefreshHandler()); tableChanges.add(modifyRefreshHandler); @@ -953,18 +958,19 @@ public class MaterializedTableManager { op.getTableIdentifier(), rollbackChanges, oldMaterializedTable); } - private TableChange.ModifyRefreshHandler genereateResetSavepointTableChange( + private TableChange.ModifyRefreshHandler generateResetSavepointTableChange( byte[] serializedContinuousHandler) { ContinuousRefreshHandler continuousRefreshHandler = deserializeContinuousHandler(serializedContinuousHandler); - ContinuousRefreshHandler resetedRefreshHandler = + ContinuousRefreshHandler resetContinuousRefreshHandler = new ContinuousRefreshHandler( continuousRefreshHandler.getExecutionTarget(), + continuousRefreshHandler.getClusterId(), continuousRefreshHandler.getJobId()); return TableChange.modifyRefreshHandler( - resetedRefreshHandler.asSummaryString(), - serializeContinuousHandler(resetedRefreshHandler)); + resetContinuousRefreshHandler.asSummaryString(), + serializeContinuousHandler(resetContinuousRefreshHandler)); } private ResultFetcher callDropMaterializedTableOperation( @@ -1024,7 +1030,7 @@ public class MaterializedTableManager { JobStatus jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); if (!jobStatus.isTerminalState()) { try { - cancelJob(operationExecutor, handle, refreshHandler.getJobId()); + cancelJob(operationExecutor, handle, refreshHandler); } catch (Exception e) { jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); if (!jobStatus.isTerminalState()) { @@ -1114,7 +1120,7 @@ public class MaterializedTableManager { ContinuousRefreshHandler refreshHandler) { ResultFetcher resultFetcher = operationExecutor.callDescribeJobOperation( - operationExecutor.getTableEnvironment(), + getTableEnvironment(operationExecutor, refreshHandler), handle, new DescribeJobOperation(refreshHandler.getJobId())); List<RowData> result = fetchAllResults(resultFetcher); @@ -1123,31 +1129,51 @@ public class MaterializedTableManager { } private static void cancelJob( - OperationExecutor operationExecutor, OperationHandle handle, String jobId) { + OperationExecutor operationExecutor, + OperationHandle handle, + ContinuousRefreshHandler refreshHandler) { operationExecutor.callStopJobOperation( - operationExecutor.getTableEnvironment(), + getTableEnvironment(operationExecutor, refreshHandler), handle, - new StopJobOperation(jobId, false, false)); + new StopJobOperation(refreshHandler.getJobId(), false, false)); } private static String stopJobWithSavepoint( - OperationExecutor executor, OperationHandle handle, String jobId) { + OperationExecutor executor, + OperationHandle handle, + ContinuousRefreshHandler refreshHandler) { // check savepoint dir is configured Optional<String> savepointDir = executor.getSessionContext().getSessionConf().getOptional(SAVEPOINT_DIRECTORY); - if (!savepointDir.isPresent()) { + if (savepointDir.isEmpty()) { throw new ValidationException( "Savepoint directory is not configured, can't stop job with savepoint."); } + String jobId = refreshHandler.getJobId(); ResultFetcher resultFetcher = executor.callStopJobOperation( - executor.getTableEnvironment(), + getTableEnvironment(executor, refreshHandler), handle, new StopJobOperation(jobId, true, false)); List<RowData> results = fetchAllResults(resultFetcher); return results.get(0).getString(0).toString(); } + private static TableEnvironmentInternal getTableEnvironment( + OperationExecutor executor, ContinuousRefreshHandler refreshHandler) { + + String target = refreshHandler.getExecutionTarget(); + Configuration sessionConfiguration = new Configuration(); + sessionConfiguration.set(TARGET, target); + Optional<String> clusterIdKeyName = getClusterIdKeyName(target); + clusterIdKeyName.ifPresent( + s -> sessionConfiguration.setString(s, refreshHandler.getClusterId())); + + return executor.getTableEnvironment( + executor.getSessionContext().getSessionState().resourceManager, + sessionConfiguration); + } + private ContinuousRefreshHandler deserializeContinuousHandler(byte[] serializedRefreshHandler) { try { return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize( @@ -1244,4 +1270,107 @@ public class MaterializedTableManager { } return results; } + + private static JobExecutionResult executeRefreshJob( + String script, + Configuration executionConfig, + OperationExecutor operationExecutor, + OperationHandle operationHandle) { + String executeTarget = operationExecutor.getSessionContext().getSessionConf().get(TARGET); + if (executeTarget == null || executeTarget.isEmpty() || "local".equals(executeTarget)) { + String errorMessage = + String.format( + "Unsupported execution target detected: %s." + + "Currently, only the following execution targets are supported: " + + "'remote', 'yarn-session', 'yarn-application', 'kubernetes-session', 'kubernetes-application'. ", + executeTarget); + LOG.error(errorMessage); + throw new ValidationException(errorMessage); + } + + if (executeTarget.endsWith("application")) { + return executeApplicationJob(script, executionConfig, operationExecutor); + } else { + return executeNonApplicationJob( + script, executionConfig, operationExecutor, operationHandle); + } + } + + private static JobExecutionResult executeNonApplicationJob( + String script, + Configuration executionConfig, + OperationExecutor operationExecutor, + OperationHandle operationHandle) { + String executeTarget = operationExecutor.getSessionContext().getSessionConf().get(TARGET); + String clusterId = + operationExecutor + .getSessionClusterId() + .orElseThrow( + () -> { + String errorMessage = + String.format( + "No cluster ID found when executing materialized table refresh job. Execution target is : %s", + executeTarget); + LOG.error(errorMessage); + return new ValidationException(errorMessage); + }); + + ResultFetcher resultFetcher = + operationExecutor.executeStatement(operationHandle, executionConfig, script); + List<RowData> results = fetchAllResults(resultFetcher); + String jobId = results.get(0).getString(0).toString(); + + return new JobExecutionResult(executeTarget, clusterId, jobId); + } + + private static JobExecutionResult executeApplicationJob( + String script, Configuration executionConfig, OperationExecutor operationExecutor) { + List<String> arguments = new ArrayList<>(); + arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt()); + arguments.add(script); + + Configuration mergedConfig = + new Configuration(operationExecutor.getSessionContext().getSessionConf()); + mergedConfig.addAll(executionConfig); + JobID jobId = new JobID(); + mergedConfig.set(PIPELINE_FIXED_JOB_ID, jobId.toString()); + + ApplicationConfiguration applicationConfiguration = + new ApplicationConfiguration( + arguments.toArray(new String[0]), SqlDriver.class.getName()); + try { + String clusterId = + new ApplicationClusterDeployer(new DefaultClusterClientServiceLoader()) + .run(mergedConfig, applicationConfiguration) + .toString(); + + return new JobExecutionResult(mergedConfig.get(TARGET), clusterId, jobId.toString()); + } catch (Throwable t) { + LOG.error("Failed to deploy script {} to application cluster.", script, t); + throw new SqlGatewayException("Failed to deploy script to cluster.", t); + } + } + + private static class JobExecutionResult { + + private final String executionTarget; + private final String clusterId; + private final String jobId; + + public JobExecutionResult(String executionTarget, String clusterId, String jobId) { + this.executionTarget = executionTarget; + this.clusterId = clusterId; + this.jobId = jobId; + } + } + + private static Optional<String> getClusterIdKeyName(String targetName) { + if (targetName.startsWith("yarn")) { + return Optional.of("yarn.application.id"); + } else if (targetName.startsWith("kubernetes")) { + return Optional.of("kubernetes.cluster-id"); + } else { + return Optional.empty(); + } + } } diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java index e89e65f654fb..9612b57fccab 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java @@ -416,6 +416,14 @@ public class OperationExecutor { sessionContext.getSessionState().functionCatalog.copy(resourceManager)); } + public <ClusterID> Optional<String> getSessionClusterId() { + ClusterClientFactory<ClusterID> clusterClientFactory = + clusterClientServiceLoader.getClusterClientFactory(sessionContext.getSessionConf()); + ClusterID clusterID = clusterClientFactory.getClusterId(sessionContext.getSessionConf()); + + return Optional.ofNullable(clusterID).map(ClusterID::toString); + } + private static Executor lookupExecutor( StreamExecutionEnvironment executionEnvironment, ClassLoader userClassLoader) { try { diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java index 784ff40d15a3..8fdb1eac326b 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java @@ -145,7 +145,8 @@ class CatalogBaseTableResolutionTest { "primary_constraint", Collections.singletonList("id"))); private static final ContinuousRefreshHandler CONTINUOUS_REFRESH_HANDLER = - new ContinuousRefreshHandler("remote", JobID.generate().toHexString()); + new ContinuousRefreshHandler( + "remote", "StandaloneClusterId", JobID.generate().toHexString()); private static final String DEFINITION_QUERY = String.format( diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java index 2798f4051e46..19eff424dd47 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java @@ -31,20 +31,22 @@ public class ContinuousRefreshHandler implements RefreshHandler, Serializable { private static final long serialVersionUID = 1L; - // TODO: add clusterId for yarn and k8s resource manager private final String executionTarget; + private final String clusterId; private final String jobId; + private final @Nullable String restorePath; - private @Nullable final String restorePath; - - public ContinuousRefreshHandler(String executionTarget, String jobId) { + public ContinuousRefreshHandler(String executionTarget, String clusterId, String jobId) { this.executionTarget = executionTarget; + this.clusterId = clusterId; this.jobId = jobId; this.restorePath = null; } - public ContinuousRefreshHandler(String executionTarget, String jobId, String restorePath) { + public ContinuousRefreshHandler( + String executionTarget, String clusterId, String jobId, String restorePath) { this.executionTarget = executionTarget; + this.clusterId = clusterId; this.jobId = jobId; this.restorePath = restorePath; } @@ -57,6 +59,10 @@ public class ContinuousRefreshHandler implements RefreshHandler, Serializable { return jobId; } + public String getClusterId() { + return clusterId; + } + public Optional<String> getRestorePath() { return Optional.ofNullable(restorePath); } @@ -64,9 +70,10 @@ public class ContinuousRefreshHandler implements RefreshHandler, Serializable { @Override public String asSummaryString() { return String.format( - "{\njobId=%s,\n executionTarget=%s%s\n}", - jobId, + "{\n executionTarget=%s,\n clusterId=%s,\n jobId=%s%s\n}", executionTarget, + clusterId, + jobId, restorePath == null ? "" : ",\n restorePath=" + restorePath); } }