This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b48cddf12a3 [FLINK-30416][sql-gateway] Add configureSession REST API
in the SQL Gateway (#21525)
b48cddf12a3 is described below
commit b48cddf12a3171cf1b6ddaa675a6b81eea1254bf
Author: yuzelin <[email protected]>
AuthorDate: Wed Jan 4 16:21:27 2023 +0800
[FLINK-30416][sql-gateway] Add configureSession REST API in the SQL Gateway
(#21525)
This closes #21525
---
.../table/gateway/rest/SqlGatewayRestEndpoint.java | 8 +
.../handler/session/ConfigureSessionHandler.java | 65 ++++
.../rest/header/SqlGatewayMessageHeaders.java | 7 +-
.../header/session/ConfigureSessionHeaders.java | 101 ++++++
.../session/ConfigureSessionRequestBody.java | 57 ++++
.../rest/util/SqlGatewayRestAPIVersion.java | 31 +-
...CaseITTest.java => OperationRelatedITCase.java} | 2 +-
...stAPIITTestBase.java => RestAPIITCaseBase.java} | 23 +-
...onCaseITTest.java => SessionRelatedITCase.java} | 71 ++--
.../gateway/rest/SqlGatewayRestEndpointITCase.java | 339 +++++++++----------
.../SqlGatewayRestEndpointStatementITCase.java | 32 +-
.../rest/{UtilCaseITTest.java => UtilITCase.java} | 2 +-
.../table/gateway/rest/util/TestingRestClient.java | 51 +++
.../{ => util}/TestingSqlGatewayRestEndpoint.java | 16 +-
.../resources/sql_gateway_rest_api_v2.snapshot | 357 +++++++++++++++++++++
15 files changed, 933 insertions(+), 229 deletions(-)
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
index 41afe0709f3..3d063d789ea 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
@@ -28,6 +28,7 @@ import
org.apache.flink.table.gateway.rest.handler.operation.CancelOperationHand
import
org.apache.flink.table.gateway.rest.handler.operation.CloseOperationHandler;
import
org.apache.flink.table.gateway.rest.handler.operation.GetOperationStatusHandler;
import org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler;
+import
org.apache.flink.table.gateway.rest.handler.session.ConfigureSessionHandler;
import
org.apache.flink.table.gateway.rest.handler.session.GetSessionConfigHandler;
import org.apache.flink.table.gateway.rest.handler.session.OpenSessionHandler;
import
org.apache.flink.table.gateway.rest.handler.session.TriggerSessionHeartbeatHandler;
@@ -39,6 +40,7 @@ import
org.apache.flink.table.gateway.rest.header.operation.CancelOperationHeade
import
org.apache.flink.table.gateway.rest.header.operation.CloseOperationHeaders;
import
org.apache.flink.table.gateway.rest.header.operation.GetOperationStatusHeaders;
import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
+import
org.apache.flink.table.gateway.rest.header.session.ConfigureSessionHeaders;
import
org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
import
org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
@@ -90,6 +92,12 @@ public class SqlGatewayRestEndpoint extends
RestServerEndpoint implements SqlGat
service, responseHeaders,
CloseSessionHeaders.getInstance());
handlers.add(Tuple2.of(CloseSessionHeaders.getInstance(),
closeSessionHandler));
+ // Configure session
+ ConfigureSessionHandler configureSessionHandler =
+ new ConfigureSessionHandler(
+ service, responseHeaders,
ConfigureSessionHeaders.getINSTANCE());
+ handlers.add(Tuple2.of(ConfigureSessionHeaders.getINSTANCE(),
configureSessionHandler));
+
// Get session configuration
GetSessionConfigHandler getSessionConfigHandler =
new GetSessionConfigHandler(
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/session/ConfigureSessionHandler.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/session/ConfigureSessionHandler.java
new file mode 100644
index 00000000000..0b69e73ccde
--- /dev/null
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/session/ConfigureSessionHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.handler.session;
+
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
+import
org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody;
+import
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/** Handler to configure a session with statement. */
+public class ConfigureSessionHandler
+ extends AbstractSqlGatewayRestHandler<
+ ConfigureSessionRequestBody, EmptyResponseBody,
SessionMessageParameters> {
+
+ public ConfigureSessionHandler(
+ SqlGatewayService service,
+ Map<String, String> responseHeaders,
+ MessageHeaders<ConfigureSessionRequestBody, EmptyResponseBody,
SessionMessageParameters>
+ messageHeaders) {
+ super(service, responseHeaders, messageHeaders);
+ }
+
+ @Override
+ protected CompletableFuture<EmptyResponseBody> handleRequest(
+ SqlGatewayRestAPIVersion version,
+ @Nonnull HandlerRequest<ConfigureSessionRequestBody> request)
+ throws RestHandlerException {
+ SessionHandle sessionHandle =
request.getPathParameter(SessionHandleIdPathParameter.class);
+ String statement = request.getRequestBody().getStatement();
+ Long timeout = request.getRequestBody().getTimeout();
+ timeout = timeout == null ? 0L : timeout;
+
+ service.configureSession(sessionHandle, statement, timeout);
+
+ return
CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
+ }
+}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/SqlGatewayMessageHeaders.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/SqlGatewayMessageHeaders.java
index 92d35d164ba..8fbf3db4c11 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/SqlGatewayMessageHeaders.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/SqlGatewayMessageHeaders.java
@@ -25,8 +25,9 @@ import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
+import java.util.stream.Collectors;
/**
* This class links {@link RequestBody}s to {@link ResponseBody}s types and
contains meta-data
@@ -44,6 +45,8 @@ public interface SqlGatewayMessageHeaders<
@Override
default Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
- return Collections.singleton(SqlGatewayRestAPIVersion.V1);
+ return Arrays.stream(SqlGatewayRestAPIVersion.values())
+ .filter(SqlGatewayRestAPIVersion::isStableVersion)
+ .collect(Collectors.toList());
}
}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/session/ConfigureSessionHeaders.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/session/ConfigureSessionHeaders.java
new file mode 100644
index 00000000000..a11b65b09cf
--- /dev/null
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/session/ConfigureSessionHeaders.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.header.session;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
+import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders;
+import
org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody;
+import
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+
+import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/** Message headers for configuring a session. */
+public class ConfigureSessionHeaders
+ implements SqlGatewayMessageHeaders<
+ ConfigureSessionRequestBody, EmptyResponseBody,
SessionMessageParameters> {
+
+ private static final ConfigureSessionHeaders INSTANCE = new
ConfigureSessionHeaders();
+
+ private static final String URL =
+ "/sessions/:" + SessionHandleIdPathParameter.KEY +
"/configure-session";
+
+ @Override
+ public Class<EmptyResponseBody> getResponseClass() {
+ return EmptyResponseBody.class;
+ }
+
+ @Override
+ public HttpResponseStatus getResponseStatusCode() {
+ return HttpResponseStatus.OK;
+ }
+
+ @Override
+ public String getDescription() {
+ return "Configures the session with the statement which could be:\n"
+ + "CREATE TABLE, DROP TABLE, ALTER TABLE, "
+ + "CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
+ + "CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, "
+ + "CREATE CATALOG, DROP CATALOG, "
+ + "USE CATALOG, USE [CATALOG.]DATABASE, "
+ + "CREATE VIEW, DROP VIEW, "
+ + "LOAD MODULE, UNLOAD MODULE, USE MODULE, "
+ + "ADD JAR.";
+ }
+
+ @Override
+ public Class<ConfigureSessionRequestBody> getRequestClass() {
+ return ConfigureSessionRequestBody.class;
+ }
+
+ @Override
+ public SessionMessageParameters getUnresolvedMessageParameters() {
+ return new SessionMessageParameters();
+ }
+
+ @Override
+ public HttpMethodWrapper getHttpMethod() {
+ return HttpMethodWrapper.POST;
+ }
+
+ @Override
+ public String getTargetRestEndpointURL() {
+ return URL;
+ }
+
+ @Override
+ public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
+ return Collections.singleton(SqlGatewayRestAPIVersion.V2);
+ }
+
+ public static ConfigureSessionHeaders getINSTANCE() {
+ return INSTANCE;
+ }
+
+ @Override
+ public String operationId() {
+ return "configureSession";
+ }
+}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/session/ConfigureSessionRequestBody.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/session/ConfigureSessionRequestBody.java
new file mode 100644
index 00000000000..4b16ffa1413
--- /dev/null
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/session/ConfigureSessionRequestBody.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.session;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+/** {@link RequestBody} for configuring a session. */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class ConfigureSessionRequestBody implements RequestBody {
+
+ private static final String FIELD_NAME_STATEMENT = "statement";
+ private static final String FIELD_NAME_EXECUTION_TIMEOUT =
"executionTimeout";
+
+ @JsonProperty(FIELD_NAME_STATEMENT)
+ private final String statement;
+
+ @JsonProperty(FIELD_NAME_EXECUTION_TIMEOUT)
+ @Nullable
+ private final Long timeout;
+
+ public ConfigureSessionRequestBody(
+ @JsonProperty(FIELD_NAME_STATEMENT) String statement,
+ @Nullable @JsonProperty(FIELD_NAME_EXECUTION_TIMEOUT) Long
timeout) {
+ this.statement = statement;
+ this.timeout = timeout;
+ }
+
+ public String getStatement() {
+ return statement;
+ }
+
+ @Nullable
+ public Long getTimeout() {
+ return timeout;
+ }
+}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
index cc5706f5d12..2b84aef323a 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestAPIVersion.java
@@ -20,6 +20,11 @@ package org.apache.flink.table.gateway.rest.util;
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
/**
* An enum for all versions of the Sql Gateway REST API.
@@ -34,8 +39,16 @@ public enum SqlGatewayRestAPIVersion
implements RestAPIVersion<SqlGatewayRestAPIVersion>, EndpointVersion {
// The bigger the ordinal(its position in enum declaration), the higher
the level of the
// version.
+
+ // V0 is just for test
V0(false, false),
- V1(true, true);
+
+ // V1 introduces basic APIs for rest endpoint
+ V1(false, true),
+
+ // V2 adds support for configuring Session and allows to serialize the
RowData with PLAIN_TEXT
+ // or JSON format.
+ V2(true, true);
private final boolean isDefaultVersion;
@@ -90,7 +103,21 @@ public enum SqlGatewayRestAPIVersion
try {
return valueOf(uri.substring(1, slashIndex).toUpperCase());
} catch (Exception e) {
- return V1;
+ return getDefaultVersion();
}
}
+
+ public static SqlGatewayRestAPIVersion getDefaultVersion() {
+ List<SqlGatewayRestAPIVersion> versions =
+ Arrays.stream(SqlGatewayRestAPIVersion.values())
+ .filter(SqlGatewayRestAPIVersion::isDefaultVersion)
+ .collect(Collectors.toList());
+ Preconditions.checkState(
+ versions.size() == 1,
+ String.format(
+ "Only one default version of Sql Gateway Rest API, but
found %s.",
+ versions.size()));
+
+ return versions.get(0);
+ }
}
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationCaseITTest.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
similarity index 99%
rename from
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationCaseITTest.java
rename to
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
index ab4c1803294..b5da70444e5 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationCaseITTest.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
@@ -52,7 +52,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
* Test basic logic of handlers inherited from {@link
AbstractSqlGatewayRestHandler} in operation
* related cases.
*/
-class OperationCaseITTest extends RestAPIITTestBase {
+class OperationRelatedITCase extends RestAPIITCaseBase {
private static final String sessionName = "test";
private static final Map<String, String> properties = new HashMap<>();
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITTestBase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
similarity index 81%
rename from
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITTestBase.java
rename to
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
index ceef72b3340..2970d2f25a1 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITTestBase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
@@ -20,36 +20,33 @@ package org.apache.flink.table.gateway.rest;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.table.gateway.rest.util.TestingRestClient;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.test.junit5.MiniClusterExtension;
-import org.apache.flink.util.ExecutorUtils;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
-import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.RegisterExtension;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import static
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
import static
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** The base class for Rest API IT test. */
-abstract class RestAPIITTestBase {
+abstract class RestAPIITCaseBase {
@RegisterExtension
@Order(1)
@@ -60,9 +57,8 @@ abstract class RestAPIITTestBase {
protected static final SqlGatewayServiceExtension
SQL_GATEWAY_SERVICE_EXTENSION =
new
SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
- @Nullable private static RestClient restClient = null;
+ @Nullable private static TestingRestClient restClient = null;
@Nullable private static String targetAddress = null;
- @Nullable private static ExecutorService executorService = null;
@Nullable private static SqlGatewayRestEndpoint sqlGatewayRestEndpoint =
null;
private static int port = 0;
@@ -75,10 +71,7 @@ abstract class RestAPIITTestBase {
new SqlGatewayRestEndpoint(config,
SQL_GATEWAY_SERVICE_EXTENSION.getService());
sqlGatewayRestEndpoint.start();
InetSocketAddress serverAddress =
checkNotNull(sqlGatewayRestEndpoint.getServerAddress());
- executorService =
- Executors.newFixedThreadPool(
- 1, new
ExecutorThreadFactory("rest-client-thread-pool"));
- restClient = new RestClient(new Configuration(), executorService);
+ restClient = getTestingRestClient();
targetAddress = serverAddress.getHostName();
port = serverAddress.getPort();
}
@@ -89,8 +82,6 @@ abstract class RestAPIITTestBase {
sqlGatewayRestEndpoint.close();
checkNotNull(restClient);
restClient.shutdown(Time.seconds(3));
- checkNotNull(executorService);
- ExecutorUtils.gracefulShutdown(3, TimeUnit.SECONDS, executorService);
}
public <
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionCaseITTest.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionRelatedITCase.java
similarity index 80%
rename from
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionCaseITTest.java
rename to
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionRelatedITCase.java
index afc27904911..2cef32f269f 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionCaseITTest.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SessionRelatedITCase.java
@@ -25,16 +25,20 @@ import
org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
import org.apache.flink.table.gateway.rest.header.session.CloseSessionHeaders;
+import
org.apache.flink.table.gateway.rest.header.session.ConfigureSessionHeaders;
import
org.apache.flink.table.gateway.rest.header.session.GetSessionConfigHeaders;
import org.apache.flink.table.gateway.rest.header.session.OpenSessionHeaders;
import
org.apache.flink.table.gateway.rest.header.session.TriggerSessionHeartbeatHeaders;
import
org.apache.flink.table.gateway.rest.message.session.CloseSessionResponseBody;
+import
org.apache.flink.table.gateway.rest.message.session.ConfigureSessionRequestBody;
import
org.apache.flink.table.gateway.rest.message.session.GetSessionConfigResponseBody;
import
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
import
org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
import
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
import org.apache.flink.table.gateway.service.session.Session;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -50,13 +54,12 @@ import java.util.concurrent.ExecutionException;
import static
org.apache.flink.table.gateway.rest.handler.session.CloseSessionHandler.CLOSE_MESSAGE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
/**
* Test basic logic of handlers inherited from {@link
AbstractSqlGatewayRestHandler} in session
* related cases.
*/
-class SessionCaseITTest extends RestAPIITTestBase {
+class SessionRelatedITCase extends RestAPIITCaseBase {
private static final String SESSION_NAME = "test";
private static final Map<String, String> properties = new HashMap<>();
@@ -77,6 +80,33 @@ class SessionCaseITTest extends RestAPIITTestBase {
CloseSessionHeaders.getInstance();
private static final EmptyRequestBody emptyRequestBody =
EmptyRequestBody.getInstance();
+ private SessionHandle sessionHandle;
+
+ private SessionMessageParameters sessionMessageParameters;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ CompletableFuture<OpenSessionResponseBody> response =
+ sendRequest(openSessionHeaders, emptyParameters,
openSessionRequestBody);
+ String sessionHandleId = response.get().getSessionHandle();
+ assertThat(sessionHandleId).isNotNull();
+
+ sessionHandle = new SessionHandle(UUID.fromString(sessionHandleId));
+
assertThat(SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager().getSession(sessionHandle))
+ .isNotNull();
+
+ sessionMessageParameters = new SessionMessageParameters(sessionHandle);
+ }
+
+ @AfterEach
+ public void cleanUp() throws Exception {
+ CompletableFuture<CloseSessionResponseBody> response =
+ sendRequest(closeSessionHeaders, sessionMessageParameters,
emptyRequestBody);
+
+ String status = response.get().getStatus();
+ assertThat(status).isEqualTo(CLOSE_MESSAGE);
+ }
+
@Test
void testCreateAndCloseSessions() throws Exception {
List<SessionHandle> sessionHandles = new ArrayList<>();
@@ -117,16 +147,6 @@ class SessionCaseITTest extends RestAPIITTestBase {
@Test
void testGetSessionConfiguration() throws Exception {
- CompletableFuture<OpenSessionResponseBody> response =
- sendRequest(openSessionHeaders, emptyParameters,
openSessionRequestBody);
- String sessionHandleId = response.get().getSessionHandle();
- assertThat(sessionHandleId).isNotNull();
- SessionHandle sessionHandle = new
SessionHandle(UUID.fromString(sessionHandleId));
-
assertThat(SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager().getSession(sessionHandle))
- .isNotNull();
-
- SessionMessageParameters sessionMessageParameters =
- new SessionMessageParameters(sessionHandle);
CompletableFuture<GetSessionConfigResponseBody> future =
sendRequest(
GetSessionConfigHeaders.getInstance(),
@@ -140,19 +160,12 @@ class SessionCaseITTest extends RestAPIITTestBase {
@Test
void testTouchSession() throws Exception {
- CompletableFuture<OpenSessionResponseBody> response =
- sendRequest(openSessionHeaders, emptyParameters,
openSessionRequestBody);
- String sessionHandleId = response.get().getSessionHandle();
- assertNotNull(sessionHandleId);
- SessionHandle sessionHandle = new
SessionHandle(UUID.fromString(sessionHandleId));
Session session =
SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager().getSession(sessionHandle);
assertThat(session).isNotNull();
long lastAccessTime = session.getLastAccessTime();
- SessionMessageParameters sessionMessageParameters =
- new SessionMessageParameters(sessionHandle);
CompletableFuture<EmptyResponseBody> future =
sendRequest(
TriggerSessionHeartbeatHeaders.getInstance(),
@@ -161,4 +174,24 @@ class SessionCaseITTest extends RestAPIITTestBase {
future.get();
assertThat(session.getLastAccessTime() > lastAccessTime).isTrue();
}
+
+ @Test
+ void testConfigureSession() throws Exception {
+ ConfigureSessionRequestBody configureSessionRequestBody =
+ new ConfigureSessionRequestBody("set 'test' = 'configure';",
-1L);
+
+ CompletableFuture<EmptyResponseBody> response =
+ sendRequest(
+ ConfigureSessionHeaders.getINSTANCE(),
+ sessionMessageParameters,
+ configureSessionRequestBody);
+ response.get();
+
+ assertThat(
+ SQL_GATEWAY_SERVICE_EXTENSION
+ .getSessionManager()
+ .getSession(sessionHandle)
+ .getSessionConfig())
+ .containsEntry("test", "configure");
+ }
}
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
index 3c2a8776a33..e52a42ee354 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
@@ -22,24 +22,25 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.testutils.BlockerSync;
+import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.RestClient;
-import org.apache.flink.runtime.rest.RestServerEndpoint;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.util.RestClientException;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.rpc.exceptions.EndpointNotStartedException;
import org.apache.flink.table.gateway.api.SqlGatewayService;
import
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
+import org.apache.flink.table.gateway.rest.util.TestingRestClient;
+import org.apache.flink.table.gateway.rest.util.TestingSqlGatewayRestEndpoint;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -49,44 +50,49 @@ import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Objects;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
import static
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT cases for {@link SqlGatewayRestEndpoint}. */
class SqlGatewayRestEndpointITCase {
- private static final SqlGatewayService service = null;
+ private static final SqlGatewayService SERVICE = null;
- private static RestServerEndpoint serverEndpoint;
- private static RestClient restClient;
+ private static SqlGatewayRestEndpoint serverEndpoint;
+ private static TestingRestClient restClient;
private static InetSocketAddress serverAddress;
- private static TestBadCaseHandler testHandler;
- private static TestVersionSelectionHeaders1 header1;
- private static TestVersionSelectionHeaders2 header2;
private static TestBadCaseHeaders badCaseHeader;
- private static TestVersionHandler testVersionHandler1;
- private static TestVersionHandler testVersionHandler2;
+ private static TestBadCaseHandler testHandler;
+
+ private static TestVersionSelectionHeaders0 header0;
+ private static TestVersionSelectionHeadersNot0 headerNot0;
+
+ private static TestVersionHandler testVersionHandler0;
+ private static TestVersionHandler testVersionHandlerNot0;
private static Configuration config;
private static final Time timeout = Time.seconds(10L);
@@ -94,43 +100,38 @@ class SqlGatewayRestEndpointITCase {
@BeforeEach
void setup() throws Exception {
// Test version cases
- header1 = new TestVersionSelectionHeaders1();
- header2 = new TestVersionSelectionHeaders2();
- testVersionHandler1 = new TestVersionHandler(service, header1);
- testVersionHandler2 = new TestVersionHandler(service, header2);
+ header0 = new TestVersionSelectionHeaders0();
+ headerNot0 = new TestVersionSelectionHeadersNot0();
+ testVersionHandler0 = new TestVersionHandler(SERVICE, header0);
+ testVersionHandlerNot0 = new TestVersionHandler(SERVICE, headerNot0);
// Test exception cases
badCaseHeader = new TestBadCaseHeaders();
- testHandler = new TestBadCaseHandler(service);
+ testHandler = new TestBadCaseHandler(SERVICE);
// Init
final String address =
InetAddress.getLoopbackAddress().getHostAddress();
config = getBaseConfig(getFlinkConfig(address, address, "0"));
serverEndpoint =
- TestingSqlGatewayRestEndpoint.builder(config, service)
+ TestingSqlGatewayRestEndpoint.builder(config, SERVICE)
.withHandler(badCaseHeader, testHandler)
- .withHandler(header1, testVersionHandler1)
- .withHandler(header2, testVersionHandler2)
+ .withHandler(header0, testVersionHandler0)
+ .withHandler(headerNot0, testVersionHandlerNot0)
.buildAndStart();
- restClient =
- new RestClient(
- config,
- Executors.newFixedThreadPool(
- 1, new
ExecutorThreadFactory("rest-client-thread-pool")));
+ restClient = getTestingRestClient();
serverAddress = serverEndpoint.getServerAddress();
}
@AfterEach
void stop() throws Exception {
-
if (restClient != null) {
- restClient.shutdown(timeout);
+ restClient.shutdown();
restClient = null;
}
if (serverEndpoint != null) {
- serverEndpoint.closeAsync().get(timeout.getSize(),
timeout.getUnit());
+ serverEndpoint.stop();
serverEndpoint = null;
}
}
@@ -138,74 +139,95 @@ class SqlGatewayRestEndpointITCase {
/** Test that {@link SqlGatewayMessageHeaders} can identify the version
correctly. */
@Test
void testSqlGatewayMessageHeaders() throws Exception {
- // The header only support V1, but send request by V0
+ // The header can't support V0, but sends request by V0
assertThatThrownBy(
() ->
restClient.sendRequest(
serverAddress.getHostName(),
serverAddress.getPort(),
- header2,
+ headerNot0,
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance(),
Collections.emptyList(),
SqlGatewayRestAPIVersion.V0))
- .isInstanceOf(IllegalArgumentException.class);
-
- // The header only support V1, send request by V1
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ IllegalArgumentException.class,
+ String.format(
+ "The requested version V0 is not
supported by the request (method=%s URL=%s). Supported versions are: %s.",
+ headerNot0.getHttpMethod(),
+ headerNot0.getTargetRestEndpointURL(),
+
headerNot0.getSupportedAPIVersions().stream()
+
.map(RestAPIVersion::getURLVersionPrefix)
+
.collect(Collectors.joining(",")))));
+
+ // The header only supports V0, sends request by V0
CompletableFuture<TestResponse> specifiedVersionResponse =
restClient.sendRequest(
serverAddress.getHostName(),
serverAddress.getPort(),
- header2,
+ header0,
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance(),
Collections.emptyList(),
- SqlGatewayRestAPIVersion.V1);
+ SqlGatewayRestAPIVersion.V0);
- TestResponse testResponse1 = specifiedVersionResponse.get(5,
TimeUnit.SECONDS);
- assertThat(testResponse1.getStatus()).isEqualTo("V1");
+ TestResponse testResponse0 =
+ specifiedVersionResponse.get(timeout.getSize(),
timeout.getUnit());
+ assertThat(testResponse0.getStatus()).isEqualTo("V0");
- // The header only support V1, send request by latest version V1
- CompletableFuture<TestResponse> unspecifiedVersionResponse =
+ // The header only supports V0, lets the client get the version
+ CompletableFuture<TestResponse> unspecifiedVersionResponse0 =
restClient.sendRequest(
serverAddress.getHostName(),
serverAddress.getPort(),
- header2,
+ header0,
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance(),
Collections.emptyList());
- TestResponse testResponse2 = unspecifiedVersionResponse.get(5,
TimeUnit.SECONDS);
- assertThat(testResponse2.getStatus()).isEqualTo("V1");
- }
+ TestResponse testResponse1 =
+ unspecifiedVersionResponse0.get(timeout.getSize(),
timeout.getUnit());
+ assertThat(testResponse1.getStatus()).isEqualTo("V0");
- /** Test that requests of different version are routed to correct
handlers. */
- @Test
- void testVersionSelection() throws Exception {
- CompletableFuture<TestResponse> version1Response =
+ // The header supports multiple versions, lets the client get the
latest version as default
+ CompletableFuture<TestResponse> unspecifiedVersionResponse1 =
restClient.sendRequest(
serverAddress.getHostName(),
serverAddress.getPort(),
- header1,
+ headerNot0,
EmptyMessageParameters.getInstance(),
EmptyRequestBody.getInstance(),
- Collections.emptyList(),
- SqlGatewayRestAPIVersion.V0);
+ Collections.emptyList());
- TestResponse testResponse = version1Response.get(5, TimeUnit.SECONDS);
- assertThat(testResponse.getStatus()).isEqualTo("V0");
+ TestResponse testResponse2 =
+ unspecifiedVersionResponse1.get(timeout.getSize(),
timeout.getUnit());
+ assertThat(testResponse2.getStatus())
+ .isEqualTo(
+
RestAPIVersion.getLatestVersion(headerNot0.getSupportedAPIVersions())
+ .name());
+ }
- CompletableFuture<TestResponse> version2Response =
- restClient.sendRequest(
- serverAddress.getHostName(),
- serverAddress.getPort(),
- header2,
- EmptyMessageParameters.getInstance(),
- EmptyRequestBody.getInstance(),
- Collections.emptyList(),
- SqlGatewayRestAPIVersion.V1);
- TestResponse testResponse2 = version2Response.get(5, TimeUnit.SECONDS);
- assertThat(testResponse2.getStatus()).isEqualTo("V1");
+ /** Test that requests of different version are routed to correct
handlers. */
+ @Test
+ void testVersionSelection() throws Exception {
+ for (SqlGatewayRestAPIVersion version :
SqlGatewayRestAPIVersion.values()) {
+ if (version != SqlGatewayRestAPIVersion.V0) {
+ CompletableFuture<TestResponse> versionResponse =
+ restClient.sendRequest(
+ serverAddress.getHostName(),
+ serverAddress.getPort(),
+ headerNot0,
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ Collections.emptyList(),
+ version);
+
+ TestResponse testResponse =
+ versionResponse.get(timeout.getSize(),
timeout.getUnit());
+ assertThat(testResponse.getStatus()).isEqualTo(version.name());
+ }
+ }
}
/**
@@ -219,12 +241,13 @@ class SqlGatewayRestEndpointITCase {
OkHttpClient client = new OkHttpClient();
final Request request =
new Request.Builder()
- .url(serverEndpoint.getRestBaseUrl() +
header1.getTargetRestEndpointURL())
+ .url(serverEndpoint.getRestBaseUrl() +
header0.getTargetRestEndpointURL())
.build();
final Response response = client.newCall(request).execute();
assert response.body() != null;
- assertThat(response.body().string()).contains("V1");
+ assertThat(response.body().string())
+ .contains(SqlGatewayRestAPIVersion.getDefaultVersion().name());
}
/**
@@ -254,13 +277,13 @@ class SqlGatewayRestEndpointITCase {
// send second request and verify response
final CompletableFuture<TestResponse> response2 =
sendRequestToTestHandler(new TestRequest(2));
- assertThat(response2.get().status).isEqualTo("2");
+ assertThat(response2.get().getStatus()).isEqualTo("2");
// wake up blocked handler
sync.releaseBlocker();
// verify response to first request
- assertThat(response1.get().status).isEqualTo("1");
+ assertThat(response1.get().getStatus()).isEqualTo("1");
}
@Test
@@ -268,29 +291,35 @@ class SqlGatewayRestEndpointITCase {
assertThatThrownBy(
() -> {
try (TestingSqlGatewayRestEndpoint
restServerEndpoint =
-
TestingSqlGatewayRestEndpoint.builder(config, service)
- .withHandler(header1, testHandler)
+
TestingSqlGatewayRestEndpoint.builder(config, SERVICE)
+ .withHandler(header0, testHandler)
.withHandler(badCaseHeader,
testHandler)
.build()) {
restServerEndpoint.start();
}
})
- .isInstanceOf(FlinkRuntimeException.class);
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ FlinkRuntimeException.class,
+ "Duplicate REST handler instance found. Please
ensure each instance is registered only once."));
}
@Test
- void testEndpointsMustBeUnique() {
+ void testHandlerRegistrationOverlappingIsForbidden() {
assertThatThrownBy(
() -> {
try (TestingSqlGatewayRestEndpoint
restServerEndpoint =
-
TestingSqlGatewayRestEndpoint.builder(config, service)
+
TestingSqlGatewayRestEndpoint.builder(config, SERVICE)
.withHandler(badCaseHeader,
testHandler)
- .withHandler(badCaseHeader,
testVersionHandler1)
+ .withHandler(badCaseHeader,
testVersionHandler0)
.build()) {
restServerEndpoint.start();
}
})
- .isInstanceOf(FlinkRuntimeException.class);
+ .satisfies(
+ FlinkAssertions.anyCauseMatches(
+ FlinkRuntimeException.class,
+ "REST handler registration overlaps with
another registration for"));
}
/**
@@ -338,40 +367,6 @@ class SqlGatewayRestEndpointITCase {
closeRestServerEndpointFuture.get(timeout.getSize(),
timeout.getUnit());
}
- @Test
- void testRestServerBindPort() throws Exception {
- final int portRangeStart = 52300;
- final int portRangeEnd = 52400;
- final String address =
InetAddress.getLoopbackAddress().getHostAddress();
- final Configuration sqlGatewayRestEndpointConfig =
- getBaseConfig(
- getFlinkConfig(address, address, portRangeStart + "-"
+ portRangeEnd));
-
- try (RestServerEndpoint serverEndpoint1 =
-
TestingSqlGatewayRestEndpoint.builder(sqlGatewayRestEndpointConfig, service)
- .build();
- RestServerEndpoint serverEndpoint2 =
-
TestingSqlGatewayRestEndpoint.builder(sqlGatewayRestEndpointConfig, service)
- .build()) {
-
- serverEndpoint1.start();
- serverEndpoint2.start();
-
-
assertThat(Objects.requireNonNull(serverEndpoint1.getServerAddress()).getPort())
- .isNotEqualTo(
-
Objects.requireNonNull(serverEndpoint2.getServerAddress()).getPort());
-
- assertThat(serverEndpoint1.getServerAddress().getPort())
- .isGreaterThanOrEqualTo(portRangeStart);
- assertThat(serverEndpoint1.getServerAddress().getPort())
- .isLessThanOrEqualTo(portRangeEnd);
- assertThat(serverEndpoint2.getServerAddress().getPort())
- .isGreaterThanOrEqualTo(portRangeStart);
- assertThat(serverEndpoint2.getServerAddress().getPort())
- .isLessThanOrEqualTo(portRangeEnd);
- }
- }
-
@Test
void testOnUnavailableRpcEndpointReturns503() {
CompletableFuture<TestResponse> response =
sendRequestToTestHandler(new TestRequest(3));
@@ -383,54 +378,12 @@ class SqlGatewayRestEndpointITCase {
.isEqualTo(HttpResponseStatus.SERVICE_UNAVAILABLE);
}
- private static class TestBadCaseHandler
- extends AbstractSqlGatewayRestHandler<
- TestRequest, TestResponse, EmptyMessageParameters> {
-
- private final OneShotLatch closeLatch = new OneShotLatch();
-
- private CompletableFuture<Void> closeFuture =
CompletableFuture.completedFuture(null);
-
- private Function<Integer, CompletableFuture<TestResponse>> handlerBody;
-
- TestBadCaseHandler(SqlGatewayService sqlGatewayService) {
- super(sqlGatewayService, Collections.emptyMap(), badCaseHeader);
- }
-
- @Override
- public CompletableFuture<Void> closeHandlerAsync() {
- closeLatch.trigger();
- return closeFuture;
- }
-
- @Override
- protected CompletableFuture<TestResponse> handleRequest(
- @Nullable SqlGatewayRestAPIVersion version,
- @NotNull HandlerRequest<TestRequest> request) {
- final int id = request.getRequestBody().id;
- if (id == 3) {
- return FutureUtils.completedExceptionally(
- new EndpointNotStartedException("test exception"));
- }
- return handlerBody.apply(id);
- }
- }
-
- private CompletableFuture<TestResponse> sendRequestToTestHandler(
- final TestRequest testRequest) {
- try {
- return restClient.sendRequest(
- serverAddress.getHostName(),
- serverAddress.getPort(),
- badCaseHeader,
- EmptyMessageParameters.getInstance(),
- testRequest);
- } catch (final IOException e) {
- throw new RuntimeException(e);
- }
- }
+ //
--------------------------------------------------------------------------------------------
+ // Messages
+ //
--------------------------------------------------------------------------------------------
private static class TestRequest implements RequestBody {
+
public final int id;
@JsonCreator
@@ -441,7 +394,7 @@ class SqlGatewayRestEndpointITCase {
private static class TestResponse implements ResponseBody {
- public final String status;
+ private final String status;
@JsonCreator
public TestResponse(@JsonProperty("status") String status) {
@@ -453,6 +406,10 @@ class SqlGatewayRestEndpointITCase {
}
}
+ //
--------------------------------------------------------------------------------------------
+ // Headers
+ //
--------------------------------------------------------------------------------------------
+
private static class TestBadCaseHeaders
implements SqlGatewayMessageHeaders<TestRequest, TestResponse,
EmptyMessageParameters> {
@@ -532,35 +489,89 @@ class SqlGatewayRestEndpointITCase {
}
}
- private static class TestVersionSelectionHeaders1 extends
TestVersionSelectionHeadersBase {
+ private static class TestVersionSelectionHeaders0 extends
TestVersionSelectionHeadersBase {
@Override
public Collection<SqlGatewayRestAPIVersion> getSupportedAPIVersions() {
return Collections.singleton(SqlGatewayRestAPIVersion.V0);
}
}
- private static class TestVersionSelectionHeaders2 extends
TestVersionSelectionHeadersBase {
+ private static class TestVersionSelectionHeadersNot0 extends
TestVersionSelectionHeadersBase {
@Override
public Collection<SqlGatewayRestAPIVersion> getSupportedAPIVersions() {
- return Collections.singleton(SqlGatewayRestAPIVersion.V1);
+ List<SqlGatewayRestAPIVersion> versions =
+ new
ArrayList<>(Arrays.asList(SqlGatewayRestAPIVersion.values()));
+ versions.remove(SqlGatewayRestAPIVersion.V0);
+ return versions;
}
}
+ //
--------------------------------------------------------------------------------------------
+ // Handlers
+ //
--------------------------------------------------------------------------------------------
+
private static class TestVersionHandler
extends AbstractSqlGatewayRestHandler<
EmptyRequestBody, TestResponse, EmptyMessageParameters> {
TestVersionHandler(
- final SqlGatewayService sqlGatewayService,
TestVersionSelectionHeadersBase header) {
+ SqlGatewayService sqlGatewayService,
TestVersionSelectionHeadersBase header) {
super(sqlGatewayService, Collections.emptyMap(), header);
}
@Override
protected CompletableFuture<TestResponse> handleRequest(
@Nullable SqlGatewayRestAPIVersion version,
- @NotNull HandlerRequest<EmptyRequestBody> request) {
+ @Nonnull HandlerRequest<EmptyRequestBody> request) {
assert version != null;
return CompletableFuture.completedFuture(new
TestResponse(version.name()));
}
}
+
+ private static class TestBadCaseHandler
+ extends AbstractSqlGatewayRestHandler<
+ TestRequest, TestResponse, EmptyMessageParameters> {
+
+ private final OneShotLatch closeLatch = new OneShotLatch();
+
+ private CompletableFuture<Void> closeFuture =
CompletableFuture.completedFuture(null);
+
+ private Function<Integer, CompletableFuture<TestResponse>> handlerBody;
+
+ TestBadCaseHandler(SqlGatewayService sqlGatewayService) {
+ super(sqlGatewayService, Collections.emptyMap(), badCaseHeader);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeHandlerAsync() {
+ closeLatch.trigger();
+ return closeFuture;
+ }
+
+ @Override
+ protected CompletableFuture<TestResponse> handleRequest(
+ @Nullable SqlGatewayRestAPIVersion version,
+ @Nonnull HandlerRequest<TestRequest> request) {
+ final int id = request.getRequestBody().id;
+ if (id == 3) {
+ return FutureUtils.completedExceptionally(
+ new EndpointNotStartedException("test exception"));
+ }
+ return handlerBody.apply(id);
+ }
+ }
+
+ private CompletableFuture<TestResponse> sendRequestToTestHandler(
+ final TestRequest testRequest) {
+ try {
+ return restClient.sendRequest(
+ serverAddress.getHostName(),
+ serverAddress.getPort(),
+ badCaseHeader,
+ EmptyMessageParameters.getInstance(),
+ testRequest);
+ } catch (final IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
index 019bcdd8abd..4fc378e9029 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
@@ -32,7 +31,6 @@ import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
-import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler;
import
org.apache.flink.table.gateway.rest.header.statement.ExecuteStatementHeaders;
import
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
@@ -43,11 +41,12 @@ import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsRespons
import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
import org.apache.flink.table.gateway.rest.serde.ResultInfo;
import
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import org.apache.flink.table.gateway.rest.util.TestingRestClient;
import
org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
import org.apache.flink.table.utils.DateTimeUtils;
-import org.apache.flink.util.ConfigurationException;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -61,8 +60,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
+import static
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -78,7 +77,7 @@ class SqlGatewayRestEndpointStatementITCase extends
AbstractSqlGatewayStatementI
private static final SqlGatewayRestEndpointExtension
SQL_GATEWAY_REST_ENDPOINT_EXTENSION =
new
SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
- private static final RestClient restClient = getTestRestClient();
+ private static TestingRestClient restClient;
private static final ExecuteStatementHeaders executeStatementHeaders =
ExecuteStatementHeaders.getInstance();
private static SessionMessageParameters sessionMessageParameters;
@@ -96,6 +95,16 @@ class SqlGatewayRestEndpointStatementITCase extends
AbstractSqlGatewayStatementI
private SessionHandle sessionHandle;
+ @BeforeAll
+ public static void setup() throws Exception {
+ restClient = getTestingRestClient();
+ }
+
+ @AfterAll
+ public static void cleanUp() throws Exception {
+ restClient.shutdown();
+ }
+
@BeforeEach
@Override
public void before(@TempDir Path temporaryFolder) throws Exception {
@@ -193,17 +202,6 @@ class SqlGatewayRestEndpointStatementITCase extends
AbstractSqlGatewayStatementI
.equals(RuntimeExecutionMode.STREAMING);
}
- private static RestClient getTestRestClient() {
- try {
- return new RestClient(
- new Configuration(),
- Executors.newFixedThreadPool(
- 1, new
ExecutorThreadFactory("rest-client-thread-pool")));
- } catch (ConfigurationException e) {
- throw new SqlGatewayException("Cannot get rest client.", e);
- }
- }
-
private class RowDataIterator implements Iterator<RowData> {
private final SessionHandle sessionHandle;
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilCaseITTest.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilITCase.java
similarity index 98%
rename from
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilCaseITTest.java
rename to
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilITCase.java
index 9ff06a22b63..1e0de661150 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilCaseITTest.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/UtilITCase.java
@@ -43,7 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
* Test basic logic of handlers inherited from {@link
AbstractSqlGatewayRestHandler} in util related
* cases.
*/
-class UtilCaseITTest extends RestAPIITTestBase {
+class UtilITCase extends RestAPIITCaseBase {
private static final GetInfoHeaders getInfoHeaders =
GetInfoHeaders.getInstance();
private static final EmptyRequestBody emptyRequestBody =
EmptyRequestBody.getInstance();
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingRestClient.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingRestClient.java
new file mode 100644
index 00000000000..41ac76f2a1c
--- /dev/null
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingRestClient.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/** Utility for setting up a rest client based on {@link RestClient} with
default settings. */
+public class TestingRestClient extends RestClient {
+
+ private final ExecutorService executorService;
+
+ private TestingRestClient(ExecutorService executorService) throws
ConfigurationException {
+ super(new Configuration(), executorService);
+ this.executorService = executorService;
+ }
+
+ public static TestingRestClient getTestingRestClient() throws Exception {
+ return new TestingRestClient(
+ Executors.newFixedThreadPool(
+ 1, new
ExecutorThreadFactory("rest-client-thread-pool")));
+ }
+
+ public void shutdown() throws Exception {
+ ExecutorUtils.gracefulShutdown(1, TimeUnit.SECONDS, executorService);
+ super.closeAsync().get();
+ }
+}
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/TestingSqlGatewayRestEndpoint.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingSqlGatewayRestEndpoint.java
similarity index 85%
rename from
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/TestingSqlGatewayRestEndpoint.java
rename to
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingSqlGatewayRestEndpoint.java
index 9fef8b86e87..a2424dd41b1 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/TestingSqlGatewayRestEndpoint.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/TestingSqlGatewayRestEndpoint.java
@@ -16,12 +16,13 @@
* limitations under the License.
*/
-package org.apache.flink.table.gateway.rest;
+package org.apache.flink.table.gateway.rest.util;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.rest.SqlGatewayRestEndpoint;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
@@ -35,9 +36,10 @@ import java.util.concurrent.CompletableFuture;
* Utility for setting up a rest server based on {@link
SqlGatewayRestEndpoint} with a given set of
* handlers.
*/
-class TestingSqlGatewayRestEndpoint extends SqlGatewayRestEndpoint {
+public class TestingSqlGatewayRestEndpoint extends SqlGatewayRestEndpoint {
- static Builder builder(Configuration configuration, SqlGatewayService
sqlGatewayService) {
+ public static Builder builder(
+ Configuration configuration, SqlGatewayService sqlGatewayService) {
return new Builder(configuration, sqlGatewayService);
}
@@ -45,7 +47,7 @@ class TestingSqlGatewayRestEndpoint extends
SqlGatewayRestEndpoint {
* TestSqlGatewayRestEndpoint.Builder is a utility class for instantiating
a
* TestSqlGatewayRestEndpoint.
*/
- static class Builder {
+ public static class Builder {
private final Configuration configuration;
private final List<Tuple2<RestHandlerSpecification,
ChannelInboundHandler>> handlers =
@@ -57,17 +59,17 @@ class TestingSqlGatewayRestEndpoint extends
SqlGatewayRestEndpoint {
this.sqlGatewayService = sqlGatewayService;
}
- Builder withHandler(
+ public Builder withHandler(
RestHandlerSpecification messageHeaders, ChannelInboundHandler
handler) {
this.handlers.add(Tuple2.of(messageHeaders, handler));
return this;
}
- TestingSqlGatewayRestEndpoint build() throws IOException,
ConfigurationException {
+ public TestingSqlGatewayRestEndpoint build() throws IOException,
ConfigurationException {
return new TestingSqlGatewayRestEndpoint(configuration, handlers,
sqlGatewayService);
}
- TestingSqlGatewayRestEndpoint buildAndStart() throws Exception {
+ public TestingSqlGatewayRestEndpoint buildAndStart() throws Exception {
TestingSqlGatewayRestEndpoint serverEndpoint = build();
serverEndpoint.start();
diff --git
a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
new file mode 100644
index 00000000000..1ed5ab30ab3
--- /dev/null
+++
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
@@ -0,0 +1,357 @@
+{
+ "calls" : [ {
+ "url" : "/api_versions",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:util:GetApiVersionResponseBody",
+ "properties" : {
+ "versions" : {
+ "type" : "array",
+ "items" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "url" : "/info",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:util:GetInfoResponseBody",
+ "properties" : {
+ "productName" : {
+ "type" : "string"
+ },
+ "version" : {
+ "type" : "string"
+ }
+ }
+ }
+ }, {
+ "url" : "/sessions",
+ "method" : "POST",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:OpenSessionRequestBody",
+ "properties" : {
+ "sessionName" : {
+ "type" : "string"
+ },
+ "properties" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "string"
+ }
+ }
+ }
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:OpenSessionResponseBody",
+ "properties" : {
+ "sessionHandle" : {
+ "type" : "string"
+ }
+ }
+ }
+ }, {
+ "url" : "/sessions/:session_handle",
+ "method" : "DELETE",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "session_handle"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:CloseSessionResponseBody",
+ "properties" : {
+ "status" : {
+ "type" : "string"
+ }
+ }
+ }
+ }, {
+ "url" : "/sessions/:session_handle",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "session_handle"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:GetSessionConfigResponseBody",
+ "properties" : {
+ "properties" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "string"
+ }
+ }
+ }
+ }
+ }, {
+ "url" : "/sessions/:session_handle/configure-session",
+ "method" : "POST",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "session_handle"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:ConfigureSessionRequestBody",
+ "properties" : {
+ "statement" : {
+ "type" : "string"
+ },
+ "executionTimeout" : {
+ "type" : "integer"
+ }
+ }
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
+ }
+ }, {
+ "url" : "/sessions/:session_handle/heartbeat",
+ "method" : "POST",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "session_handle"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyResponseBody"
+ }
+ }, {
+ "url" : "/sessions/:session_handle/operations/:operation_handle/cancel",
+ "method" : "POST",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "session_handle"
+ }, {
+ "key" : "operation_handle"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody",
+ "properties" : {
+ "status" : {
+ "type" : "string"
+ }
+ }
+ }
+ }, {
+ "url" : "/sessions/:session_handle/operations/:operation_handle/close",
+ "method" : "DELETE",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "session_handle"
+ }, {
+ "key" : "operation_handle"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody",
+ "properties" : {
+ "status" : {
+ "type" : "string"
+ }
+ }
+ }
+ }, {
+ "url" :
"/sessions/:session_handle/operations/:operation_handle/result/:token",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "session_handle"
+ }, {
+ "key" : "operation_handle"
+ }, {
+ "key" : "token"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:FetchResultsResponseBody",
+ "properties" : {
+ "results" : {
+ "type" : "any"
+ },
+ "resultType" : {
+ "type" : "string"
+ },
+ "nextResultUri" : {
+ "type" : "string"
+ }
+ }
+ }
+ }, {
+ "url" : "/sessions/:session_handle/operations/:operation_handle/status",
+ "method" : "GET",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "session_handle"
+ }, {
+ "key" : "operation_handle"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody",
+ "properties" : {
+ "status" : {
+ "type" : "string"
+ }
+ }
+ }
+ }, {
+ "url" : "/sessions/:session_handle/statements",
+ "method" : "POST",
+ "status-code" : "200 OK",
+ "file-upload" : false,
+ "path-parameters" : {
+ "pathParameters" : [ {
+ "key" : "session_handle"
+ } ]
+ },
+ "query-parameters" : {
+ "queryParameters" : [ ]
+ },
+ "request" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:ExecuteStatementRequestBody",
+ "properties" : {
+ "statement" : {
+ "type" : "string"
+ },
+ "executionTimeout" : {
+ "type" : "integer"
+ },
+ "executionConfig" : {
+ "type" : "object",
+ "additionalProperties" : {
+ "type" : "string"
+ }
+ }
+ }
+ },
+ "response" : {
+ "type" : "object",
+ "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:ExecuteStatementResponseBody",
+ "properties" : {
+ "operationHandle" : {
+ "type" : "string"
+ }
+ }
+ }
+ } ]
+}