This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 30845617fe1 [FLINK-36759][sql-gateway] Add REST API to deploy script in application mode (#25730) 30845617fe1 is described below commit 30845617fe12f3532228a2bf5a1539dc7cba4b30 Author: Shengkai <33114724+fsk...@users.noreply.github.com> AuthorDate: Sat Jan 4 23:17:40 2025 +0800 [FLINK-36759][sql-gateway] Add REST API to deploy script in application mode (#25730) --- .../flink/client/cli/ApplicationDeployer.java | 2 +- .../cli/ApplicationClusterDeployer.java | 8 +- .../flink/client/program/TestingClusterClient.java | 2 +- flink-end-to-end-tests/run-nightly-tests.sh | 1 + flink-end-to-end-tests/test-scripts/common.sh | 6 + .../test_kubernetes_sql_application.sh | 78 +++++ .../flink-sql-client/src/test/resources/sql/set.q | 137 ++++---- .../flink/table/gateway/api/SqlGatewayService.java | 21 ++ .../gateway/api/utils/MockedSqlGatewayService.java | 11 + .../table/gateway/rest/SqlGatewayRestEndpoint.java | 11 + .../handler/application/DeployScriptHandler.java | 75 +++++ .../header/application/DeployScriptHeaders.java | 91 +++++ .../application/DeployScriptRequestBody.java | 71 ++++ .../application/DeployScriptResponseBody.java | 42 +++ .../gateway/service/SqlGatewayServiceImpl.java | 47 +++ .../gateway/service/context/DefaultContext.java | 2 + .../table/gateway/rest/DeployScriptITCase.java | 371 +++++++++++++++++++++ .../service/application/ScriptRunnerITCase.java | 4 +- .../service/utils/SqlGatewayServiceExtension.java | 13 +- ...he.flink.client.deployment.ClusterClientFactory | 16 + .../flink-sql-gateway/src/test/resources/sql/set.q | 50 +-- .../resources/sql_gateway_rest_api_v4.snapshot | 40 +++ .../flink/table/runtime/application/SqlDriver.java | 16 +- 23 files changed, 1009 insertions(+), 106 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ApplicationDeployer.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ApplicationDeployer.java index f09faccf3fc..92de8afb1e0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ApplicationDeployer.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ApplicationDeployer.java @@ -34,7 +34,7 @@ public interface ApplicationDeployer { * @param applicationConfiguration an {@link ApplicationConfiguration} specific to the * application to be executed. */ - <ClusterID> void run( + <ClusterID> ClusterID run( final Configuration configuration, final ApplicationConfiguration applicationConfiguration) throws Exception; diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/cli/ApplicationClusterDeployer.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/cli/ApplicationClusterDeployer.java index b62060f77b6..e59bb22505d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/cli/ApplicationClusterDeployer.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/cli/ApplicationClusterDeployer.java @@ -48,7 +48,7 @@ public class ApplicationClusterDeployer implements ApplicationDeployer { this.clientServiceLoader = checkNotNull(clientServiceLoader); } - public <ClusterID> void run( + public <ClusterID> ClusterID run( final Configuration configuration, final ApplicationConfiguration applicationConfiguration) throws Exception { @@ -64,8 +64,10 @@ public class ApplicationClusterDeployer implements ApplicationDeployer { final ClusterSpecification clusterSpecification = clientFactory.getClusterSpecification(configuration); - clusterDescriptor.deployApplicationCluster( - clusterSpecification, applicationConfiguration); + return clusterDescriptor + .deployApplicationCluster(clusterSpecification, applicationConfiguration) + .getClusterClient() + .getClusterId(); } } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java index 0fa803ea8b9..0e31c24af94 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java @@ -113,7 +113,7 @@ public class TestingClusterClient<T> implements ClusterClient<T> { @Override public T getClusterId() { - throw new UnsupportedOperationException(); + return (T) "test-cluster"; } @Override diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 963903c3257..dcd1fe633d8 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -181,6 +181,7 @@ function run_group_2 { run_test "Streaming SQL end-to-end test using planner loader" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions" run_test "Streaming SQL end-to-end test using planner with Scala version" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh scala-planner" "skip_check_exceptions" run_test "Sql Jdbc Driver end-to-end test" "$END_TO_END_DIR/test-scripts/test_sql_jdbc_driver.sh" "skip_check_exceptions" + run_test "Run kubernetes SQL application test" "$END_TO_END_DIR/test-scripts/test_kubernetes_sql_application.sh" run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh local StreamingFileSink" "skip_check_exceptions" run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_file_sink.sh s3 StreamingFileSink" "skip_check_exceptions" diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index fe24d0ef01a..ab13b7e57ed 100755 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -533,10 +533,12 @@ function check_logs_for_non_empty_out_files { function shutdown_all { stop_cluster + stop_sql_gateway # stop TMs which started by command: bin/taskmanager.sh start "$FLINK_DIR"/bin/taskmanager.sh stop-all tm_kill_all jm_kill_all + gw_kill_all } function stop_cluster { @@ -683,6 +685,10 @@ function tm_kill_all { kill_all 'TaskManagerRunner|TaskManager' } +function gw_kill_all { + kill_all 'SqlGateway' +} + # Kills all processes that match the given name. function kill_all { local pid=`jps | grep -E "${1}" | cut -d " " -f 1 || true` diff --git a/flink-end-to-end-tests/test-scripts/test_kubernetes_sql_application.sh b/flink-end-to-end-tests/test-scripts/test_kubernetes_sql_application.sh new file mode 100755 index 00000000000..e109c9ffc84 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_kubernetes_sql_application.sh @@ -0,0 +1,78 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source "$(dirname "$0")"/common_kubernetes.sh + +CURRENT_DIR=`cd "$(dirname "$0")" && pwd -P` +CLUSTER_ROLE_BINDING="flink-role-binding-default" +CLUSTER_ID="flink-native-k8s-sql-application-1" +FLINK_IMAGE_NAME="test_kubernetes_application-1" +LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log" +IMAGE_BUILD_RETRIES=3 +IMAGE_BUILD_BACKOFF=2 + +function internal_cleanup { + kubectl delete deployment ${CLUSTER_ID} + kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING} +} + +start_kubernetes + +if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image ${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then + echo "ERROR: Could not build image. Aborting..." + exit 1 +fi + +kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit --serviceaccount=default:default --namespace=default + +mkdir -p "$LOCAL_LOGS_PATH" + +echo "[INFO] Start SQL Gateway" +set_config_key "sql-gateway.endpoint.rest.address" "localhost" +start_sql_gateway + +echo "[INFO] Submit SQL job in Application Mode" +SESSION_HANDLE=`curl --silent --request POST http://localhost:8083/sessions | sed -n 's/.*"sessionHandle":\s*"\([^"]*\)".*/\1/p'` +curl --location --request POST http://localhost:8083/sessions/${SESSION_HANDLE}/scripts \ +--header 'Content-Type: application/json' \ +--data-raw '{ + "script": "CREATE TEMPORARY TABLE sink(a INT) WITH ( '\''connector'\'' = '\''blackhole'\''); INSERT INTO sink VALUES (1), (2), (3);", + "executionConfig": { + "execution.target": "kubernetes-application", + "kubernetes.cluster-id": "'${CLUSTER_ID}'", + "kubernetes.container.image.ref": "'${FLINK_IMAGE_NAME}'", + "jobmanager.memory.process.size": "1088m", + "taskmanager.memory.process.size": "1000m", + "kubernetes.jobmanager.cpu": 0.5, + "kubernetes.taskmanager.cpu": 0.5, + "kubernetes.rest-service.exposed.type": "NodePort" + } +}' + +echo "" +echo "[INFO] Wait job finishes" +kubectl wait --for=condition=Available --timeout=60s deploy/${CLUSTER_ID} || exit 1 +jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') +wait_rest_endpoint_up_k8s $jm_pod_name + +# The Flink cluster will be destroyed immediately once the job finished or failed. So we check jobmanager logs +# instead of checking the result +echo "[INFO] Check logs to verify job finishes" +kubectl logs -f $jm_pod_name >$LOCAL_LOGS_PATH/jobmanager.log +grep -E "Job [A-Za-z0-9]+ reached terminal state FINISHED" $LOCAL_LOGS_PATH/jobmanager.log diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q index 05f79218cb2..4189a9588a2 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/set.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q @@ -74,21 +74,22 @@ reset 'table.resources.download-dir'; # list the configured configuration set; -+-------------------------------------------------+-----------+ -| key | value | -+-------------------------------------------------+-----------+ -| execution.attached | true | -| execution.shutdown-on-attached-exit | false | -| execution.state-recovery.claim-mode | NO_CLAIM | -| execution.state-recovery.ignore-unclaimed-state | false | -| execution.target | remote | -| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS | -| rest.port | $VAR_REST_PORT | -| sql-client.display.print-time-cost | false | -| sql-client.execution.result-mode | tableau | -| table.exec.legacy-cast-behaviour | DISABLED | -+-------------------------------------------------+-----------+ -10 rows in set ++-------------------------------------------------+--------------+ +| key | value | ++-------------------------------------------------+--------------+ +| $internal.deployment.config-dir | /dummy/conf/ | +| execution.attached | true | +| execution.shutdown-on-attached-exit | false | +| execution.state-recovery.claim-mode | NO_CLAIM | +| execution.state-recovery.ignore-unclaimed-state | false | +| execution.target | remote | +| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS | +| rest.port | $VAR_REST_PORT | +| sql-client.display.print-time-cost | false | +| sql-client.execution.result-mode | tableau | +| table.exec.legacy-cast-behaviour | DISABLED | ++-------------------------------------------------+--------------+ +11 rows in set !ok # reset the configuration @@ -97,18 +98,19 @@ reset; !info set; -+-------------------------------------------------+-----------+ -| key | value | -+-------------------------------------------------+-----------+ -| execution.attached | true | -| execution.shutdown-on-attached-exit | false | -| execution.state-recovery.claim-mode | NO_CLAIM | -| execution.state-recovery.ignore-unclaimed-state | false | -| execution.target | remote | -| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS | -| rest.port | $VAR_REST_PORT | -+-------------------------------------------------+-----------+ -7 rows in set ++-------------------------------------------------+--------------+ +| key | value | ++-------------------------------------------------+--------------+ +| $internal.deployment.config-dir | /dummy/conf/ | +| execution.attached | true | +| execution.shutdown-on-attached-exit | false | +| execution.state-recovery.claim-mode | NO_CLAIM | +| execution.state-recovery.ignore-unclaimed-state | false | +| execution.target | remote | +| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS | +| rest.port | $VAR_REST_PORT | ++-------------------------------------------------+--------------+ +8 rows in set !ok # should fail because default dialect doesn't support hive dialect @@ -136,19 +138,20 @@ set 'sql-client.verbose' = 'true'; !info set; -+-------------------------------------------------+-----------+ -| key | value | -+-------------------------------------------------+-----------+ -| execution.attached | true | -| execution.shutdown-on-attached-exit | false | -| execution.state-recovery.claim-mode | NO_CLAIM | -| execution.state-recovery.ignore-unclaimed-state | false | -| execution.target | remote | -| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS | -| rest.port | $VAR_REST_PORT | -| sql-client.verbose | true | -+-------------------------------------------------+-----------+ -8 rows in set ++-------------------------------------------------+--------------+ +| key | value | ++-------------------------------------------------+--------------+ +| $internal.deployment.config-dir | /dummy/conf/ | +| execution.attached | true | +| execution.shutdown-on-attached-exit | false | +| execution.state-recovery.claim-mode | NO_CLAIM | +| execution.state-recovery.ignore-unclaimed-state | false | +| execution.target | remote | +| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS | +| rest.port | $VAR_REST_PORT | +| sql-client.verbose | true | ++-------------------------------------------------+--------------+ +9 rows in set !ok set 'execution.attached' = 'false'; @@ -160,19 +163,20 @@ reset 'execution.attached'; !info set; -+-------------------------------------------------+-----------+ -| key | value | -+-------------------------------------------------+-----------+ -| execution.attached | true | -| execution.shutdown-on-attached-exit | false | -| execution.state-recovery.claim-mode | NO_CLAIM | -| execution.state-recovery.ignore-unclaimed-state | false | -| execution.target | remote | -| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS | -| rest.port | $VAR_REST_PORT | -| sql-client.verbose | true | -+-------------------------------------------------+-----------+ -8 rows in set ++-------------------------------------------------+--------------+ +| key | value | ++-------------------------------------------------+--------------+ +| $internal.deployment.config-dir | /dummy/conf/ | +| execution.attached | true | +| execution.shutdown-on-attached-exit | false | +| execution.state-recovery.claim-mode | NO_CLAIM | +| execution.state-recovery.ignore-unclaimed-state | false | +| execution.target | remote | +| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS | +| rest.port | $VAR_REST_PORT | +| sql-client.verbose | true | ++-------------------------------------------------+--------------+ +9 rows in set !ok # test reset can work with add jar @@ -190,19 +194,20 @@ SHOW JARS; !ok set; -+-------------------------------------------------+-----------+ -| key | value | -+-------------------------------------------------+-----------+ -| execution.attached | true | -| execution.shutdown-on-attached-exit | false | -| execution.state-recovery.claim-mode | NO_CLAIM | -| execution.state-recovery.ignore-unclaimed-state | false | -| execution.target | remote | -| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS | -| rest.port | $VAR_REST_PORT | -| sql-client.verbose | true | -+-------------------------------------------------+-----------+ -8 rows in set ++-------------------------------------------------+--------------+ +| key | value | ++-------------------------------------------------+--------------+ +| $internal.deployment.config-dir | /dummy/conf/ | +| execution.attached | true | +| execution.shutdown-on-attached-exit | false | +| execution.state-recovery.claim-mode | NO_CLAIM | +| execution.state-recovery.ignore-unclaimed-state | false | +| execution.target | remote | +| jobmanager.rpc.address | $VAR_JOBMANAGER_RPC_ADDRESS | +| rest.port | $VAR_REST_PORT | +| sql-client.verbose | true | ++-------------------------------------------------+--------------+ +9 rows in set !ok reset; diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java index c19843a4d53..bb69fdc658d 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java @@ -41,6 +41,7 @@ import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import javax.annotation.Nullable; +import java.net.URI; import java.util.List; import java.util.Map; import java.util.Set; @@ -347,4 +348,24 @@ public interface SqlGatewayService { Map<String, String> dynamicOptions, Map<String, String> staticPartitions, Map<String, String> executionConfig); + + // ------------------------------------------------------------------------------------------- + // Deploy Script + // ------------------------------------------------------------------------------------------- + + /** + * Deploy the script in application mode. + * + * @param sessionHandle handle to identify the session. + * @param scriptUri URI of the script. + * @param script the content of the script. + * @param executionConfig to run the script. + * @return the cluster description. + */ + <ClusterID> ClusterID deployScript( + SessionHandle sessionHandle, + @Nullable URI scriptUri, + @Nullable String script, + Configuration executionConfig) + throws SqlGatewayException; } diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java index 348cf80df16..d7b45779629 100644 --- a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java +++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/utils/MockedSqlGatewayService.java @@ -39,6 +39,7 @@ import org.apache.flink.table.gateway.api.session.SessionHandle; import javax.annotation.Nullable; +import java.net.URI; import java.util.List; import java.util.Map; import java.util.Set; @@ -201,6 +202,16 @@ public class MockedSqlGatewayService implements SqlGatewayService { throw new UnsupportedOperationException(); } + @Override + public <ClusterID> ClusterID deployScript( + SessionHandle sessionHandle, + @Nullable URI scriptUri, + @Nullable String script, + Configuration executionConfig) + throws SqlGatewayException { + throw new UnsupportedOperationException(); + } + @Override public ResolvedCatalogBaseTable<?> getTable( SessionHandle sessionHandle, ObjectIdentifier tableIdentifier) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java index 2fa462ade85..e48df09a64d 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.rest.RestServerEndpoint; import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.table.gateway.api.SqlGatewayService; import org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpoint; +import org.apache.flink.table.gateway.rest.handler.application.DeployScriptHandler; import org.apache.flink.table.gateway.rest.handler.materializedtable.RefreshMaterializedTableHandler; import org.apache.flink.table.gateway.rest.handler.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowHandler; import org.apache.flink.table.gateway.rest.handler.materializedtable.scheduler.DeleteEmbeddedSchedulerWorkflowHandler; @@ -43,6 +44,7 @@ import org.apache.flink.table.gateway.rest.handler.statement.ExecuteStatementHan import org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler; import org.apache.flink.table.gateway.rest.handler.util.GetApiVersionHandler; import org.apache.flink.table.gateway.rest.handler.util.GetInfoHandler; +import org.apache.flink.table.gateway.rest.header.application.DeployScriptHeaders; import org.apache.flink.table.gateway.rest.header.materializedtable.RefreshMaterializedTableHeaders; import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.CreateEmbeddedSchedulerWorkflowHeaders; import org.apache.flink.table.gateway.rest.header.materializedtable.scheduler.DeleteEmbeddedSchedulerWorkflowHeaders; @@ -99,6 +101,7 @@ public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGat addStatementRelatedHandlers(handlers); addEmbeddedSchedulerRelatedHandlers(handlers); addMaterializedTableRelatedHandlers(handlers); + addDeployScriptRelatedHandlers(handlers); return handlers; } @@ -257,6 +260,14 @@ public class SqlGatewayRestEndpoint extends RestServerEndpoint implements SqlGat refreshMaterializedTableHandler)); } + private void addDeployScriptRelatedHandlers( + List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers) { + DeployScriptHandler handler = + new DeployScriptHandler( + service, responseHeaders, DeployScriptHeaders.getInstance()); + handlers.add(Tuple2.of(DeployScriptHeaders.getInstance(), handler)); + } + @Override protected void startInternal() { quartzScheduler.start(); diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/application/DeployScriptHandler.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/application/DeployScriptHandler.java new file mode 100644 index 00000000000..120fb278750 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/application/DeployScriptHandler.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.handler.application; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.table.gateway.api.SqlGatewayService; +import org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler; +import org.apache.flink.table.gateway.rest.message.application.DeployScriptRequestBody; +import org.apache.flink.table.gateway.rest.message.application.DeployScriptResponseBody; +import org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter; +import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.net.URI; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** Handler to deploy the script in application mode. */ +public class DeployScriptHandler + extends AbstractSqlGatewayRestHandler< + DeployScriptRequestBody, DeployScriptResponseBody, SessionMessageParameters> { + + public DeployScriptHandler( + SqlGatewayService service, + Map<String, String> responseHeaders, + MessageHeaders< + DeployScriptRequestBody, + DeployScriptResponseBody, + SessionMessageParameters> + messageHeaders) { + super(service, responseHeaders, messageHeaders); + } + + @Override + protected CompletableFuture<DeployScriptResponseBody> handleRequest( + @Nullable SqlGatewayRestAPIVersion version, + @Nonnull HandlerRequest<DeployScriptRequestBody> request) + throws RestHandlerException { + return CompletableFuture.completedFuture( + new DeployScriptResponseBody( + service.deployScript( + request.getPathParameter( + SessionHandleIdPathParameter.class), + request.getRequestBody().getScriptUri() == null + ? null + : URI.create( + request.getRequestBody().getScriptUri()), + request.getRequestBody().getScript(), + Configuration.fromMap( + request.getRequestBody().getExecutionConfig())) + .toString())); + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/application/DeployScriptHeaders.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/application/DeployScriptHeaders.java new file mode 100644 index 00000000000..54c1ea5f310 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/application/DeployScriptHeaders.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.header.application; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; +import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders; +import org.apache.flink.table.gateway.rest.message.application.DeployScriptRequestBody; +import org.apache.flink.table.gateway.rest.message.application.DeployScriptResponseBody; +import org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter; +import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.Collection; + +/** Message headers for run the script in application mode. */ +public class DeployScriptHeaders + implements SqlGatewayMessageHeaders< + DeployScriptRequestBody, DeployScriptResponseBody, SessionMessageParameters> { + + private static final DeployScriptHeaders INSTANCE = new DeployScriptHeaders(); + + private static final String URL = "/sessions/:" + SessionHandleIdPathParameter.KEY + "/scripts"; + + public static DeployScriptHeaders getInstance() { + return INSTANCE; + } + + @Override + public Class<DeployScriptResponseBody> getResponseClass() { + return DeployScriptResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public String getDescription() { + return "Deploy the script in application mode"; + } + + @Override + public Class<DeployScriptRequestBody> getRequestClass() { + return DeployScriptRequestBody.class; + } + + @Override + public SessionMessageParameters getUnresolvedMessageParameters() { + return new SessionMessageParameters(); + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.POST; + } + + @Override + public String getTargetRestEndpointURL() { + return URL; + } + + @Override + public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() { + return SqlGatewayRestAPIVersion.getHigherVersions(SqlGatewayRestAPIVersion.V3); + } + + @Override + public String operationId() { + return "deployScript"; + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptRequestBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptRequestBody.java new file mode 100644 index 00000000000..bc30a5b3e10 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptRequestBody.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.message.application; + +import org.apache.flink.runtime.rest.messages.RequestBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.Map; + +/** Request to deploy a script in application mode. */ +public class DeployScriptRequestBody implements RequestBody { + + private static final String FIELD_NAME_SCRIPT = "script"; + private static final String FIELD_NAME_SCRIPT_URI = "scriptUri"; + private static final String FIELD_NAME_EXECUTION_CONFIG = "executionConfig"; + + @JsonProperty(FIELD_NAME_SCRIPT) + private final @Nullable String script; + + @JsonProperty(FIELD_NAME_SCRIPT_URI) + private final @Nullable String scriptUri; + + @JsonProperty(FIELD_NAME_EXECUTION_CONFIG) + private final Map<String, String> executionConfig; + + @JsonCreator + public DeployScriptRequestBody( + @JsonProperty(FIELD_NAME_SCRIPT) @Nullable String script, + @JsonProperty(FIELD_NAME_SCRIPT_URI) @Nullable String scriptUri, + @JsonProperty(FIELD_NAME_EXECUTION_CONFIG) @Nullable + Map<String, String> executionConfig) { + this.script = script; + this.scriptUri = scriptUri; + this.executionConfig = executionConfig == null ? Collections.emptyMap() : executionConfig; + } + + public Map<String, String> getExecutionConfig() { + return executionConfig; + } + + @Nullable + public String getScript() { + return script; + } + + @Nullable + public String getScriptUri() { + return scriptUri; + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptResponseBody.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptResponseBody.java new file mode 100644 index 00000000000..3f0126eae25 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/application/DeployScriptResponseBody.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest.message.application; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +/** Response about the cluster that runs the application. */ +public class DeployScriptResponseBody implements ResponseBody { + + private static final String FIELD_NAME_CLUSTER_ID = "clusterID"; + + @JsonProperty(FIELD_NAME_CLUSTER_ID) + private final String clusterID; + + @JsonCreator + public DeployScriptResponseBody(@JsonProperty(FIELD_NAME_CLUSTER_ID) String clusterID) { + this.clusterID = clusterID; + } + + public String getClusterID() { + return clusterID; + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java index e88f0212362..371d8aa8993 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java @@ -19,6 +19,9 @@ package org.apache.flink.table.gateway.service; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.CatalogBaseTable.TableKind; import org.apache.flink.table.catalog.ObjectIdentifier; @@ -42,12 +45,16 @@ import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import org.apache.flink.table.gateway.service.operation.OperationManager; import org.apache.flink.table.gateway.service.session.Session; import org.apache.flink.table.gateway.service.session.SessionManager; +import org.apache.flink.table.runtime.application.SqlDriver; +import org.apache.flink.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.net.URI; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -330,6 +337,46 @@ public class SqlGatewayServiceImpl implements SqlGatewayService { } } + @Override + public <ClusterID> ClusterID deployScript( + SessionHandle sessionHandle, + @Nullable URI scriptUri, + @Nullable String script, + Configuration executionConfig) + throws SqlGatewayException { + Session session = sessionManager.getSession(sessionHandle); + if (scriptUri == null && script == null) { + throw new IllegalArgumentException("Please specify script content or uri."); + } + if (scriptUri != null && !StringUtils.isNullOrWhitespaceOnly(script)) { + throw new IllegalArgumentException( + "Please specify either the script uri or the script content, but not both."); + } + Configuration mergedConfig = Configuration.fromMap(session.getSessionConfig()); + mergedConfig.addAll(executionConfig); + + List<String> arguments = new ArrayList<>(); + if (scriptUri != null) { + arguments.add("--" + SqlDriver.OPTION_SQL_FILE.getLongOpt()); + arguments.add(scriptUri.toString()); + } + if (script != null) { + arguments.add("--" + SqlDriver.OPTION_SQL_SCRIPT.getLongOpt()); + arguments.add(script); + } + + ApplicationConfiguration applicationConfiguration = + new ApplicationConfiguration( + arguments.toArray(new String[0]), SqlDriver.class.getName()); + try { + return new ApplicationClusterDeployer(new DefaultClusterClientServiceLoader()) + .run(mergedConfig, applicationConfiguration); + } catch (Throwable t) { + LOG.error("Failed to deploy script to cluster.", t); + throw new SqlGatewayException("Failed to deploy script to cluster.", t); + } + } + @Override public Set<FunctionInfo> listUserDefinedFunctions( SessionHandle sessionHandle, String catalogName, String databaseName) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java index eae416cd0a6..78eccc7337d 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java @@ -25,6 +25,7 @@ import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.client.cli.ExecutionConfigAccessor; import org.apache.flink.client.cli.ProgramOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptionsInternal; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.plugin.PluginUtils; @@ -141,6 +142,7 @@ public class DefaultContext { // 2. load the global configuration Configuration configuration = GlobalConfiguration.loadConfiguration(flinkConfigDir); + configuration.set(DeploymentOptionsInternal.CONF_DIR, flinkConfigDir); configuration.addAll(dynamicConfig); // 3. load the custom command lines diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/DeployScriptITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/DeployScriptITCase.java new file mode 100644 index 00000000000..3ed3c80c169 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/DeployScriptITCase.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.rest; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.client.deployment.ClusterClientFactory; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptions; +import org.apache.flink.core.execution.CheckpointType; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.client.JobStatusMessage; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmaster.JobResult; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.operators.coordination.CoordinationRequest; +import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.streaming.api.graph.ExecutionPlan; +import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.rest.header.application.DeployScriptHeaders; +import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders; +import org.apache.flink.table.gateway.rest.message.application.DeployScriptRequestBody; +import org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody; +import org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters; +import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension; +import org.apache.flink.table.gateway.rest.util.TestingRestClient; +import org.apache.flink.table.gateway.service.utils.MockHttpServer; +import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; +import org.apache.flink.table.runtime.application.SqlDriver; +import org.apache.flink.util.FileUtils; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; + +import javax.annotation.Nullable; + +import java.io.File; +import java.net.URL; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; + +/** ITCase to test deploy the script into application mode. */ +public class DeployScriptITCase { + + @RegisterExtension + @Order(1) + public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = + new SqlGatewayServiceExtension(Configuration::new); + + @RegisterExtension + @Order(2) + private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION = + new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService); + + private static TestingRestClient restClient; + private static SessionHandle sessionHandle; + private static final String script = + "CREATE TEMPORARY TABLE sink(\n" + + " a INT\n" + + ") WITH (\n" + + " 'connector' = 'blackhole'\n" + + ");\n" + + "INSERT INTO sink VALUES (1);"; + + @BeforeAll + static void beforeAll() throws Exception { + restClient = TestingRestClient.getTestingRestClient(); + sessionHandle = + new SessionHandle( + UUID.fromString( + restClient + .sendRequest( + SQL_GATEWAY_REST_ENDPOINT_EXTENSION + .getTargetAddress(), + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(), + OpenSessionHeaders.getInstance(), + EmptyMessageParameters.getInstance(), + new OpenSessionRequestBody( + "test", + Collections.singletonMap("key", "value"))) + .get() + .getSessionHandle())); + } + + @Test + void testDeployScriptToYarnCluster(@TempDir Path workDir) throws Exception { + verifyDeployScriptToCluster("yarn-application", script, null, script); + try (MockHttpServer server = MockHttpServer.startHttpServer()) { + File file = workDir.resolve("script.sql").toFile(); + assertThat(file.createNewFile()).isTrue(); + FileUtils.writeFileUtf8(file, script); + URL url = server.prepareResource("/download/script.sql", file); + verifyDeployScriptToCluster("yarn-application", null, url.toString(), script); + } + } + + @Test + void testDeployScriptToKubernetesCluster(@TempDir Path workDir) throws Exception { + verifyDeployScriptToCluster("kubernetes-application", script, null, script); + try (MockHttpServer server = MockHttpServer.startHttpServer()) { + File file = workDir.resolve("script.sql").toFile(); + assertThat(file.createNewFile()).isTrue(); + FileUtils.writeFileUtf8(file, script); + URL url = server.prepareResource("/download/script.sql", file); + verifyDeployScriptToCluster("kubernetes-application", null, url.toString(), script); + } + } + + private void verifyDeployScriptToCluster( + String target, @Nullable String script, @Nullable String scriptUri, String content) + throws Exception { + TestApplicationClusterClientFactory.id = target; + + assertThat( + restClient + .sendRequest( + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(), + SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(), + DeployScriptHeaders.getInstance(), + new SessionMessageParameters(sessionHandle), + new DeployScriptRequestBody( + script, + scriptUri, + Collections.singletonMap( + DeploymentOptions.TARGET.key(), target))) + .get() + .getClusterID()) + .isEqualTo("test"); + ApplicationConfiguration config = TestApplicationClusterDescriptor.applicationConfiguration; + assertThat(TestApplicationClusterClientFactory.configuration.getString("key", "none")) + .isEqualTo("value"); + assertThat(config.getApplicationClassName()).isEqualTo(SqlDriver.class.getName()); + assertThat(SqlDriver.parseOptions(config.getProgramArguments())).isEqualTo(content); + } + + /** + * Test {@link ClusterClientFactory} to capture {@link Configuration} and {@link + * ApplicationConfiguration}. + */ + @SuppressWarnings({"unchecked", "rawTypes"}) + public static class TestApplicationClusterClientFactory<ClusterID> + implements ClusterClientFactory { + + public static String id; + + private static volatile Configuration configuration; + + @Override + public boolean isCompatibleWith(Configuration configuration) { + return Objects.equals(id, configuration.get(DeploymentOptions.TARGET)); + } + + @Override + @SuppressWarnings("unchecked") + public ClusterDescriptor<ClusterID> createClusterDescriptor(Configuration configuration) { + TestApplicationClusterClientFactory.configuration = configuration; + return TestApplicationClusterDescriptor.INSTANCE; + } + + @Override + @Nullable + public String getClusterId(Configuration configuration) { + return "test-application"; + } + + @Override + public ClusterSpecification getClusterSpecification(Configuration configuration) { + return new ClusterSpecification.ClusterSpecificationBuilder() + .createClusterSpecification(); + } + } + + private static class TestApplicationClusterDescriptor<T> implements ClusterDescriptor<T> { + + @SuppressWarnings("rawTypes") + private static final TestApplicationClusterDescriptor INSTANCE = + new TestApplicationClusterDescriptor<>(); + + static volatile ApplicationConfiguration applicationConfiguration; + + private TestApplicationClusterDescriptor() {} + + @Override + public String getClusterDescription() { + return "Test Application Cluster Descriptor"; + } + + @Override + public ClusterClientProvider<T> retrieve(T clusterId) { + throw new UnsupportedOperationException(); + } + + @Override + public ClusterClientProvider<T> deploySessionCluster( + ClusterSpecification clusterSpecification) { + throw new UnsupportedOperationException(); + } + + @Override + public ClusterClientProvider<T> deployApplicationCluster( + final ClusterSpecification clusterSpecification, + final ApplicationConfiguration applicationConfiguration) { + TestApplicationClusterDescriptor.applicationConfiguration = applicationConfiguration; + return new ClusterClientProvider<T>() { + @Override + public ClusterClient<T> getClusterClient() { + return new TestClusterClient(); + } + }; + } + + @Override + public ClusterClientProvider<T> deployJobCluster( + ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) { + throw new UnsupportedOperationException(); + } + + @Override + public void killCluster(T clusterId) { + throw new UnsupportedOperationException(); + } + + @Override + public void close() { + // nothing to do + } + } + + @SuppressWarnings("rawTypes") + private static class TestClusterClient implements ClusterClient { + + @Override + public void close() {} + + @Override + public Object getClusterId() { + return "test"; + } + + @Override + public Configuration getFlinkConfiguration() { + throw new UnsupportedOperationException(); + } + + @Override + public void shutDownCluster() { + throw new UnsupportedOperationException(); + } + + @Override + public String getWebInterfaceURL() { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Collection<JobStatusMessage>> listJobs() { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<JobID> submitJob(ExecutionPlan executionPlan) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<JobStatus> getJobStatus(JobID jobId) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<JobResult> requestJobResult(JobID jobId) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Map<String, Object>> getAccumulators( + JobID jobID, ClassLoader loader) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Acknowledge> cancel(JobID jobId) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<String> cancelWithSavepoint( + JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<String> stopWithSavepoint( + JobID jobId, + boolean advanceToEndOfEventTime, + @Nullable String savepointDirectory, + SavepointFormatType formatType) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<String> stopWithDetachedSavepoint( + JobID jobId, + boolean advanceToEndOfEventTime, + @Nullable String savepointDirectory, + SavepointFormatType formatType) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<String> triggerSavepoint( + JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<Long> triggerCheckpoint( + JobID jobId, CheckpointType checkpointType) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<String> triggerDetachedSavepoint( + JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<CoordinationResponse> sendCoordinationRequest( + JobID jobId, String operatorUid, CoordinationRequest request) { + throw new UnsupportedOperationException(); + } + } +} diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptRunnerITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptRunnerITCase.java index 5ddfeed5062..4eb91e0a44b 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptRunnerITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/application/ScriptRunnerITCase.java @@ -141,7 +141,7 @@ class ScriptRunnerITCase { udfJar.getAbsolutePath(), GENERATED_LOWER_UDF_CLASS); List<String> arguments = - Arrays.asList("--scriptPath", createStatementFile(workDir, script).toString()); + Arrays.asList("--scriptUri", createStatementFile(workDir, script).toString()); runScriptInCluster(arguments); assertThat(TestValuesTableFactory.getResultsAsStrings("sink")) @@ -166,7 +166,7 @@ class ScriptRunnerITCase { try (MockHttpServer server = MockHttpServer.startHttpServer()) { URL url = server.prepareResource("/download/script.sql", file); - List<String> arguments = Arrays.asList("--scriptPath", url.toString()); + List<String> arguments = Arrays.asList("--scriptUri", url.toString()); runScriptInCluster(arguments); } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java index 47b16a0baf0..9b0690b70e9 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceExtension.java @@ -19,6 +19,7 @@ package org.apache.flink.table.gateway.service.utils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptionsInternal; import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.table.gateway.api.SqlGatewayService; import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl; @@ -35,6 +36,7 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.nio.file.Paths; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -89,7 +91,12 @@ public class SqlGatewayServiceExtension implements BeforeAllCallback, AfterAllCa sessionManager = sessionManagerCreator.apply( DefaultContext.load( - new Configuration(), Collections.emptyList(), true)); + Configuration.fromMap( + Collections.singletonMap( + DeploymentOptionsInternal.CONF_DIR.key(), + "/dummy/conf/")), + Collections.emptyList(), + true)); } finally { CommonTestUtils.setEnv(originalEnv); } @@ -114,6 +121,10 @@ public class SqlGatewayServiceExtension implements BeforeAllCallback, AfterAllCa return sessionManager; } + public String getConfDir() { + return Paths.get(temporaryFolder.getRoot().getAbsolutePath(), "conf").toString(); + } + private String getFlinkConfContent(Map<String, String> flinkConf) { StringBuilder sb = new StringBuilder(); flinkConf.forEach((k, v) -> sb.append(k).append(": ").append(v).append("\n")); diff --git a/flink-table/flink-sql-gateway/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory b/flink-table/flink-sql-gateway/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory new file mode 100644 index 00000000000..9ea49fe13be --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/resources/META-INF/services/org.apache.flink.client.deployment.ClusterClientFactory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.gateway.rest.DeployScriptITCase$TestApplicationClusterClientFactory \ No newline at end of file diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/set.q b/flink-table/flink-sql-gateway/src/test/resources/sql/set.q index f96ba188492..902e58e3bbb 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql/set.q +++ b/flink-table/flink-sql-gateway/src/test/resources/sql/set.q @@ -27,18 +27,19 @@ reset table.resources.download-dir; set; !output -+-------------------------------------------------+-----------+ -| key | value | -+-------------------------------------------------+-----------+ -| execution.attached | true | -| execution.shutdown-on-attached-exit | false | -| execution.state-recovery.claim-mode | NO_CLAIM | -| execution.state-recovery.ignore-unclaimed-state | false | -| execution.target | remote | -| jobmanager.rpc.address | localhost | -| rest.port | $VAR_REST_PORT | -+-------------------------------------------------+-----------+ -7 rows in set ++-------------------------------------------------+--------------+ +| key | value | ++-------------------------------------------------+--------------+ +| $internal.deployment.config-dir | /dummy/conf/ | +| execution.attached | true | +| execution.shutdown-on-attached-exit | false | +| execution.state-recovery.claim-mode | NO_CLAIM | +| execution.state-recovery.ignore-unclaimed-state | false | +| execution.target | remote | +| jobmanager.rpc.address | localhost | +| rest.port | $VAR_REST_PORT | ++-------------------------------------------------+--------------+ +8 rows in set !ok # set illegal value @@ -49,16 +50,17 @@ java.lang.IllegalArgumentException: No enum constant org.apache.flink.table.api. set; !output -+-------------------------------------------------+-----------+ -| key | value | -+-------------------------------------------------+-----------+ -| execution.attached | true | -| execution.shutdown-on-attached-exit | false | -| execution.state-recovery.claim-mode | NO_CLAIM | -| execution.state-recovery.ignore-unclaimed-state | false | -| execution.target | remote | -| jobmanager.rpc.address | localhost | -| rest.port | $VAR_REST_PORT | -+-------------------------------------------------+-----------+ -7 rows in set ++-------------------------------------------------+--------------+ +| key | value | ++-------------------------------------------------+--------------+ +| $internal.deployment.config-dir | /dummy/conf/ | +| execution.attached | true | +| execution.shutdown-on-attached-exit | false | +| execution.state-recovery.claim-mode | NO_CLAIM | +| execution.state-recovery.ignore-unclaimed-state | false | +| execution.target | remote | +| jobmanager.rpc.address | localhost | +| rest.port | $VAR_REST_PORT | ++-------------------------------------------------+--------------+ +8 rows in set !ok diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v4.snapshot b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v4.snapshot index f4d14c6089f..8da82fac080 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v4.snapshot +++ b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v4.snapshot @@ -341,6 +341,46 @@ } } } + }, { + "url" : "/sessions/:session_handle/scripts", + "method" : "POST", + "status-code" : "200 OK", + "file-upload" : false, + "path-parameters" : { + "pathParameters" : [ { + "key" : "session_handle" + } ] + }, + "query-parameters" : { + "queryParameters" : [ ] + }, + "request" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:application:DeployScriptRequestBody", + "properties" : { + "script" : { + "type" : "string" + }, + "scriptUri" : { + "type" : "string" + }, + "executionConfig" : { + "type" : "object", + "additionalProperties" : { + "type" : "string" + } + } + } + }, + "response" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:application:DeployScriptResponseBody", + "properties" : { + "clusterID" : { + "type" : "string" + } + } + } }, { "url" : "/sessions/:session_handle/statements", "method" : "POST", diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/application/SqlDriver.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/application/SqlDriver.java index 2e01fbcad81..5c22f6b9d73 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/application/SqlDriver.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/application/SqlDriver.java @@ -62,12 +62,12 @@ public class SqlDriver { public static final Option OPTION_SQL_FILE = Option.builder() - .longOpt("scriptPath") + .longOpt("scriptUri") .numberOfArgs(1) - .desc("SQL script file path. It supports to fetch files from the DFS or HTTP.") + .desc("SQL script file URI. It supports to fetch files from the DFS or HTTP.") .build(); - public static final Option OPTION_SQL_STATEMENTS = + public static final Option OPTION_SQL_SCRIPT = Option.builder().longOpt("script").numberOfArgs(1).desc("Script content.").build(); private static final String RUNNER_CLASS_NAME = @@ -146,7 +146,7 @@ public class SqlDriver { static Options getSqlDriverOptions() { Options options = new Options(); options.addOption(OPTION_SQL_FILE); - options.addOption(OPTION_SQL_STATEMENTS); + options.addOption(OPTION_SQL_SCRIPT); return options; } @@ -161,12 +161,12 @@ public class SqlDriver { String content = getContent(line.getOptionValue(OPTION_SQL_FILE.getLongOpt())); if (content == null) { return Preconditions.checkNotNull( - line.getOptionValue(OPTION_SQL_STATEMENTS.getLongOpt()), - "Please use \"--script\" or \"--scriptPath\" to specify script either."); + line.getOptionValue(OPTION_SQL_SCRIPT.getLongOpt()), + "Please use \"--script\" or \"--scriptUri\" to specify script either."); } else { Preconditions.checkArgument( - line.getOptionValue(OPTION_SQL_STATEMENTS.getLongOpt()) == null, - "Don't set \"--script\" or \"--scriptPath\" together."); + line.getOptionValue(OPTION_SQL_SCRIPT.getLongOpt()) == null, + "Don't set \"--script\" or \"--scriptUri\" together."); return content; } } catch (ParseException | URISyntaxException e) {