This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 029860f26ce [FLINK-30693][sql-gateway] Supports to specify RowFormat
when fetching results
029860f26ce is described below
commit 029860f26ce8455cdd2bd1244785f42fee0440ca
Author: yuzelin <[email protected]>
AuthorDate: Sun Jan 15 20:55:50 2023 +0800
[FLINK-30693][sql-gateway] Supports to specify RowFormat when fetching
results
This closes #21677
---
.../endpoint/hive/HiveServer2EndpointITCase.java | 4 +-
.../hive/HiveServer2EndpointStatementITCase.java | 75 ++++++++----
.../src/test/resources/endpoint/select.q | 58 +---------
.../table/gateway/api/results/ResultSetImpl.java | 4 +
.../table/gateway/rest/SqlGatewayRestEndpoint.java | 16 ++-
.../handler/statement/FetchResultsHandler.java | 54 ++++++---
.../rest/header/statement/FetchResultsHeaders.java | 78 ++++++++++---
.../statement/FetchResultResponseBodyImpl.java | 84 ++++++++++++++
...ers.java => FetchResultsMessageParameters.java} | 30 +++--
.../statement/FetchResultsResponseBody.java | 52 +++------
.../FetchResultsRowFormatQueryParameter.java | 49 ++++++++
.../statement/NotReadyFetchResultResponse.java} | 48 ++++----
.../serde/FetchResultResponseBodyDeserializer.java | 90 +++++++++++++++
.../serde/FetchResultsResponseBodySerializer.java | 67 +++++++++++
.../flink/table/gateway/rest/serde/ResultInfo.java | 126 ++++++++++++++++-----
...serializer.java => ResultInfoDeserializer.java} | 64 ++++++++---
...onSerializer.java => ResultInfoSerializer.java} | 100 ++++++++++------
.../flink/table/gateway/rest/util/RowFormat.java | 36 ++++++
.../service/operation/OperationManager.java | 5 +-
.../gateway/service/result/NotReadyResult.java | 2 +-
.../gateway/AbstractSqlGatewayStatementITCase.java | 63 ++++++++---
.../table/gateway/rest/OperationRelatedITCase.java | 4 +-
.../table/gateway/rest/RestAPIITCaseBase.java | 4 +-
.../gateway/rest/SqlGatewayRestEndpointITCase.java | 4 +-
.../SqlGatewayRestEndpointStatementITCase.java | 100 +++++++++++-----
.../gateway/rest/SqlGatewayRestEndpointTest.java | 4 +-
.../table/gateway/rest/StatementRelatedITCase.java | 20 +++-
.../rest/serde/ResultInfoJsonSerDeTest.java | 123 ++++++++++++++------
.../rest/util/SqlGatewayRestEndpointExtension.java | 4 +-
...s.java => SqlGatewayRestEndpointTestUtils.java} | 16 ++-
.../gateway/service/SqlGatewayServiceITCase.java | 4 +-
...{resultInfo.txt => result_info_json_format.txt} | 2 +-
.../resources/result_info_plain_text_format.txt | 1 +
.../src/test/resources/sql/begin_statement_set.q | 60 +---------
.../src/test/resources/sql/insert.q | 80 +------------
.../src/test/resources/sql/statement_set.q | 61 +---------
.../resources/sql_gateway_rest_api_v1.snapshot | 14 +--
.../resources/sql_gateway_rest_api_v2.snapshot | 21 +---
38 files changed, 1042 insertions(+), 585 deletions(-)
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 98e8612a54a..71bb5277f37 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -38,6 +38,7 @@ import
org.apache.flink.table.gateway.api.session.SessionEnvironment;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
+import org.apache.flink.table.gateway.service.result.NotReadyResult;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0;
@@ -114,7 +115,6 @@ import static
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN
import static
org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
import static
org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
import static
org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle;
-import static
org.apache.flink.table.gateway.service.result.NotReadyResult.NOT_READY_RESULT;
import static
org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -816,7 +816,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
sessionHandle,
() -> {
latch.await();
- return NOT_READY_RESULT;
+ return NotReadyResult.INSTANCE;
});
manipulateOp.accept(
toTOperationHandle(sessionHandle, operationHandle,
TOperationType.UNKNOWN));
diff --git
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
index 09127b995ef..bccc4ffdadd 100644
---
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
+++
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
@@ -31,6 +31,7 @@ import
org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.hive.jdbc.HiveConnection;
import org.apache.hive.service.rpc.thrift.TSessionHandle;
@@ -39,8 +40,6 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
@@ -53,6 +52,7 @@ import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
@@ -68,6 +68,16 @@ public class HiveServer2EndpointStatementITCase extends
AbstractSqlGatewayStatem
private Connection connection;
private Statement statement;
+ @Parameters(name = "parameters={0}")
+ public static List<TestParameters> parameters() throws Exception {
+ return Stream.concat(
+ listFlinkSqlTests().stream()
+ .map(path -> new HiveTestParameters(path,
true)),
+ listTestSpecInTheSameModule("endpoint").stream()
+ .map(path -> new HiveTestParameters(path,
false)))
+ .collect(Collectors.toList());
+ }
+
@BeforeEach
@Override
public void before(@TempDir Path temporaryFolder) throws Exception {
@@ -82,16 +92,6 @@ public class HiveServer2EndpointStatementITCase extends
AbstractSqlGatewayStatem
connection.close();
}
- public static Stream<String> listHiveSqlTests() throws Exception {
- return listTestSpecInTheSameModule("endpoint");
- }
-
- @ParameterizedTest
- @MethodSource("listHiveSqlTests")
- public void testHiveSqlStatements(String sqlPath) throws Exception {
- runTest(sqlPath);
- }
-
@Override
protected String runSingleStatement(String sql) throws Exception {
statement.execute(sql);
@@ -151,18 +151,20 @@ public class HiveServer2EndpointStatementITCase extends
AbstractSqlGatewayStatem
}
@Override
- protected void resetSessionForFlinkSqlStatements() throws Exception {
- for (String sql :
- Arrays.asList(
- "RESET",
- "CREATE CATALOG `default_catalog` \n"
- + "WITH (\n"
- + "'type' = 'generic_in_memory',\n"
- + "'default-database' = 'default_database')",
- "USE CATALOG `default_catalog`",
- "DROP CATALOG hive",
- "UNLOAD MODULE hive")) {
- runSingleStatement(sql);
+ protected void prepareEnvironment() throws Exception {
+ if (((HiveTestParameters) parameters).getResetEnvironment()) {
+ for (String sql :
+ Arrays.asList(
+ "RESET",
+ "CREATE CATALOG `default_catalog` \n"
+ + "WITH (\n"
+ + "'type' = 'generic_in_memory',\n"
+ + "'default-database' =
'default_database')",
+ "USE CATALOG `default_catalog`",
+ "DROP CATALOG hive",
+ "UNLOAD MODULE hive")) {
+ runSingleStatement(sql);
+ }
}
}
@@ -183,4 +185,29 @@ public class HiveServer2EndpointStatementITCase extends
AbstractSqlGatewayStatem
return DataTypes.ROW(fields);
}
+
+ private static class HiveTestParameters extends TestParameters {
+
+ private final boolean resetEnvironment;
+
+ public HiveTestParameters(String sqlPath, boolean resetEnvironment) {
+ super(sqlPath);
+ this.resetEnvironment = resetEnvironment;
+ }
+
+ public boolean getResetEnvironment() {
+ return resetEnvironment;
+ }
+
+ @Override
+ public String toString() {
+ return "HiveTestParameters{"
+ + "resetEnvironment="
+ + resetEnvironment
+ + ", sqlPath='"
+ + sqlPath
+ + '\''
+ + '}';
+ }
+ }
}
diff --git
a/flink-connectors/flink-connector-hive/src/test/resources/endpoint/select.q
b/flink-connectors/flink-connector-hive/src/test/resources/endpoint/select.q
index 26f6c01de8b..2c7400e2e53 100644
--- a/flink-connectors/flink-connector-hive/src/test/resources/endpoint/select.q
+++ b/flink-connectors/flink-connector-hive/src/test/resources/endpoint/select.q
@@ -27,35 +27,10 @@ CREATE TABLE dummy (
1 row in set
!ok
-SET $internal.pipeline.job-id = 879d95431dd5516bf93a44c9e0fdf3b5;
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
-
INSERT INTO dummy VALUES (1);
!output
-+----------------------------------+
-| job id |
-+----------------------------------+
-| 879d95431dd5516bf93a44c9e0fdf3b5 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET $internal.pipeline.job-id;
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
# ==========================================================================
# test all types
@@ -87,37 +62,12 @@ CREATE TABLE hive_types_table (
1 row in set
!ok
-SET $internal.pipeline.job-id = 40eadf85976ed00808ff1c6b20e8d616;
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
-
INSERT INTO hive_types_table
SELECT true, 1, 2, 3, 4, 5.0, 6.0, 7.1111, 'Hello World', 'Flink Hive',
'2112-12-12 00:00:05.006', '2002-12-13', 'byte', MAP('a', 'b'), ARRAY(1)
FROM dummy LIMIT 1;
!output
-+----------------------------------+
-| job id |
-+----------------------------------+
-| 40eadf85976ed00808ff1c6b20e8d616 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET $internal.pipeline.job-id;
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
SELECT * FROM hive_types_table;
!output
diff --git
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java
index 7ab13606ecc..ea94f1c4568 100644
---
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java
+++
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java
@@ -88,6 +88,10 @@ public class ResultSetImpl implements ResultSet {
return data;
}
+ public RowDataToStringConverter getConverter() {
+ return converter;
+ }
+
@Override
public boolean isQueryResult() {
return isQueryResult;
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
index 3d0114219fc..cb573afd5b9 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
@@ -167,10 +167,18 @@ public class SqlGatewayRestEndpoint extends
RestServerEndpoint implements SqlGat
handlers.add(Tuple2.of(ExecuteStatementHeaders.getInstance(),
executeStatementHandler));
// Fetch results
- FetchResultsHandler fetchResultsHandler =
- new FetchResultsHandler(
- service, responseHeaders,
FetchResultsHeaders.getInstance());
- handlers.add(Tuple2.of(FetchResultsHeaders.getInstance(),
fetchResultsHandler));
+ handlers.add(
+ Tuple2.of(
+ FetchResultsHeaders.getDefaultInstance(),
+ new FetchResultsHandler(
+ service,
+ responseHeaders,
+ FetchResultsHeaders.getDefaultInstance())));
+ handlers.add(
+ Tuple2.of(
+ FetchResultsHeaders.getInstanceV1(),
+ new FetchResultsHandler(
+ service, responseHeaders,
FetchResultsHeaders.getInstanceV1())));
}
@Override
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
index 52b18be0bed..8cb4ff80802 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
@@ -19,6 +19,8 @@
package org.apache.flink.table.gateway.rest.handler.statement;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.table.gateway.api.SqlGatewayService;
@@ -30,14 +32,17 @@ import
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler
import
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
import
org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPathParameter;
import
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import
org.apache.flink.table.gateway.rest.message.statement.FetchResultResponseBodyImpl;
+import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
-import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
+import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsRowFormatQueryParameter;
import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenPathParameter;
+import
org.apache.flink.table.gateway.rest.message.statement.NotReadyFetchResultResponse;
import org.apache.flink.table.gateway.rest.serde.ResultInfo;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -45,49 +50,64 @@ import java.util.concurrent.CompletableFuture;
/** Handler to fetch results. */
public class FetchResultsHandler
extends AbstractSqlGatewayRestHandler<
- EmptyRequestBody, FetchResultsResponseBody,
FetchResultsTokenParameters> {
+ EmptyRequestBody, FetchResultsResponseBody,
FetchResultsMessageParameters> {
public FetchResultsHandler(
SqlGatewayService service,
Map<String, String> responseHeaders,
- MessageHeaders<EmptyRequestBody, FetchResultsResponseBody,
FetchResultsTokenParameters>
+ MessageHeaders<
+ EmptyRequestBody,
+ FetchResultsResponseBody,
+ FetchResultsMessageParameters>
messageHeaders) {
super(service, responseHeaders, messageHeaders);
}
@Override
protected CompletableFuture<FetchResultsResponseBody> handleRequest(
- SqlGatewayRestAPIVersion version, @Nonnull
HandlerRequest<EmptyRequestBody> request) {
+ SqlGatewayRestAPIVersion version, @Nonnull
HandlerRequest<EmptyRequestBody> request)
+ throws RestHandlerException {
// Parse the parameters
SessionHandle sessionHandle =
request.getPathParameter(SessionHandleIdPathParameter.class);
OperationHandle operationHandle =
request.getPathParameter(OperationHandleIdPathParameter.class);
Long token =
request.getPathParameter(FetchResultsTokenPathParameter.class);
+ RowFormat rowFormat =
+ HandlerRequestUtils.getQueryParameter(
+ request, FetchResultsRowFormatQueryParameter.class,
RowFormat.JSON);
// Get the statement results
- @Nullable ResultSet resultSet;
- @Nullable String resultType;
- Long nextToken;
-
+ ResultSet resultSet;
try {
resultSet =
service.fetchResults(sessionHandle, operationHandle,
token, Integer.MAX_VALUE);
- nextToken = resultSet.getNextToken();
- resultType = resultSet.getResultType().toString();
} catch (Exception e) {
throw new SqlGatewayException(e);
}
- // Build the response
+ ResultSet.ResultType resultType = resultSet.getResultType();
+ Long nextToken = resultSet.getNextToken();
String nextResultUri =
FetchResultsHeaders.buildNextUri(
- version.name().toLowerCase(),
+ version,
sessionHandle.getIdentifier().toString(),
operationHandle.getIdentifier().toString(),
- nextToken);
+ nextToken,
+ rowFormat);
- return CompletableFuture.completedFuture(
- new FetchResultsResponseBody(
- ResultInfo.createResultInfo(resultSet), resultType,
nextResultUri));
+ // Build the response
+ if (resultType == ResultSet.ResultType.NOT_READY) {
+ return CompletableFuture.completedFuture(
+ new NotReadyFetchResultResponse(nextResultUri));
+ } else {
+ return CompletableFuture.completedFuture(
+ new FetchResultResponseBodyImpl(
+ resultType,
+ resultSet.isQueryResult(),
+ resultSet.getJobID(),
+ resultSet.getResultKind(),
+ ResultInfo.createResultInfo(resultSet, rowFormat),
+ nextResultUri));
+ }
}
}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
index 274b24ff129..49399ed67d7 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
@@ -20,23 +20,35 @@ package
org.apache.flink.table.gateway.rest.header.statement;
import org.apache.flink.runtime.rest.HttpMethodWrapper;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders;
import
org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPathParameter;
import
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
-import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenPathParameter;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion.V1;
+
/** Message headers for fetching results. */
public class FetchResultsHeaders
implements SqlGatewayMessageHeaders<
- EmptyRequestBody, FetchResultsResponseBody,
FetchResultsTokenParameters> {
+ EmptyRequestBody, FetchResultsResponseBody,
FetchResultsMessageParameters> {
- private static final FetchResultsHeaders INSTANCE = new
FetchResultsHeaders();
+ private static final FetchResultsHeaders INSTANCE_V1 = new
FetchResultsHeaders(V1);
+ private static final FetchResultsHeaders DEFAULT_INSTANCE =
+ new
FetchResultsHeaders(SqlGatewayRestAPIVersion.getDefaultVersion());
public static final String URL =
"/sessions/:"
@@ -46,6 +58,41 @@ public class FetchResultsHeaders
+ "/result/:"
+ FetchResultsTokenPathParameter.KEY;
+ private final SqlGatewayRestAPIVersion version;
+
+ private FetchResultsHeaders(SqlGatewayRestAPIVersion version) {
+ this.version = version;
+ }
+
+ public static FetchResultsHeaders getInstanceV1() {
+ return INSTANCE_V1;
+ }
+
+ public static FetchResultsHeaders getDefaultInstance() {
+ return DEFAULT_INSTANCE;
+ }
+
+ public static @Nullable String buildNextUri(
+ SqlGatewayRestAPIVersion version,
+ String sessionId,
+ String operationId,
+ Long nextToken,
+ RowFormat rowFormat) {
+ if (nextToken == null) {
+ return null;
+ }
+
+ if (version == V1) {
+ return String.format(
+ "/%s/sessions/%s/operations/%s/result/%s",
+ version, sessionId, operationId, nextToken);
+ } else {
+ return String.format(
+ "/%s/sessions/%s/operations/%s/result/%s?rowFormat=%s",
+ version, sessionId, operationId, nextToken, rowFormat);
+ }
+ }
+
@Override
public Class<FetchResultsResponseBody> getResponseClass() {
return FetchResultsResponseBody.class;
@@ -71,20 +118,15 @@ public class FetchResultsHeaders
return URL;
}
- public static FetchResultsHeaders getInstance() {
- return INSTANCE;
- }
-
- @Nullable
- public static String buildNextUri(
- String version, String sessionId, String operationId, Long
nextToken) {
- if (nextToken != null) {
- return String.format(
- "/%s/sessions/%s/operations/%s/result/%s",
- version, sessionId, operationId, nextToken);
+ @Override
+ public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
+ if (version == V1) {
+ return Collections.singleton(V1);
} else {
- // Empty uri indicates there is no more data
- return null;
+ return Arrays.stream(SqlGatewayRestAPIVersion.values())
+ .filter(SqlGatewayRestAPIVersion::isStableVersion)
+ .filter(version -> version != V1)
+ .collect(Collectors.toList());
}
}
@@ -94,8 +136,8 @@ public class FetchResultsHeaders
}
@Override
- public FetchResultsTokenParameters getUnresolvedMessageParameters() {
- return new FetchResultsTokenParameters();
+ public FetchResultsMessageParameters getUnresolvedMessageParameters() {
+ return new FetchResultsMessageParameters(version);
}
@Override
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultResponseBodyImpl.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultResponseBodyImpl.java
new file mode 100644
index 00000000000..14a19f3d677
--- /dev/null
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultResponseBodyImpl.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.statement;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.rest.serde.ResultInfo;
+
+import javax.annotation.Nullable;
+
+/** The implementation of the {@link FetchResultsResponseBody} with ready
results. */
+public class FetchResultResponseBodyImpl implements FetchResultsResponseBody {
+
+ private final ResultInfo results;
+ private final ResultSet.ResultType resultType;
+ @Nullable private final String nextResultUri;
+ private final boolean isQueryResult;
+ @Nullable private final JobID jobID;
+ private final ResultKind resultKind;
+
+ public FetchResultResponseBodyImpl(
+ ResultSet.ResultType resultType,
+ boolean isQueryResult,
+ @Nullable JobID jobID,
+ ResultKind resultKind,
+ ResultInfo results,
+ @Nullable String nextResultUri) {
+ this.results = results;
+ this.resultType = resultType;
+ this.nextResultUri = nextResultUri;
+ this.isQueryResult = isQueryResult;
+ this.jobID = jobID;
+ this.resultKind = resultKind;
+ }
+
+ @Override
+ public ResultInfo getResults() {
+ return results;
+ }
+
+ @Override
+ public ResultSet.ResultType getResultType() {
+ return resultType;
+ }
+
+ @Nullable
+ @Override
+ public String getNextResultUri() {
+ return nextResultUri;
+ }
+
+ @Override
+ public boolean isQueryResult() {
+ return isQueryResult;
+ }
+
+ @Nullable
+ @Override
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ @Override
+ public ResultKind getResultKind() {
+ return resultKind;
+ }
+}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsMessageParameters.java
similarity index 68%
rename from
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java
rename to
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsMessageParameters.java
index 132dc631a7b..0f74a1dc628 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsMessageParameters.java
@@ -25,13 +25,15 @@ import
org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.session.SessionHandle;
import
org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPathParameter;
import
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-/** {@link MessagePathParameter} for fetching results. */
-public class FetchResultsTokenParameters extends MessageParameters {
+/** {@link MessageParameters} for fetching results. */
+public class FetchResultsMessageParameters extends MessageParameters {
private final SessionHandleIdPathParameter sessionHandleIdPathParameter =
new SessionHandleIdPathParameter();
@@ -42,15 +44,25 @@ public class FetchResultsTokenParameters extends
MessageParameters {
private final FetchResultsTokenPathParameter
fetchResultsTokenPathParameter =
new FetchResultsTokenPathParameter();
- public FetchResultsTokenParameters() {
- // nothing to resolve
+ private final FetchResultsRowFormatQueryParameter
fetchResultsRowFormatQueryParameter =
+ new FetchResultsRowFormatQueryParameter();
+
+ private final SqlGatewayRestAPIVersion version;
+
+ public FetchResultsMessageParameters(SqlGatewayRestAPIVersion version) {
+ this.version = version;
}
- public FetchResultsTokenParameters(
- SessionHandle sessionHandle, OperationHandle operationHandle, Long
token) {
+ public FetchResultsMessageParameters(
+ SessionHandle sessionHandle,
+ OperationHandle operationHandle,
+ Long token,
+ RowFormat rowFormat) {
+ this.version = SqlGatewayRestAPIVersion.getDefaultVersion();
sessionHandleIdPathParameter.resolve(sessionHandle);
operationHandleIdPathParameter.resolve(operationHandle);
fetchResultsTokenPathParameter.resolve(token);
+
fetchResultsRowFormatQueryParameter.resolve(Collections.singletonList(rowFormat));
}
@Override
@@ -63,6 +75,10 @@ public class FetchResultsTokenParameters extends
MessageParameters {
@Override
public Collection<MessageQueryParameter<?>> getQueryParameters() {
- return Collections.emptyList();
+ if (version == SqlGatewayRestAPIVersion.V1) {
+ return Collections.emptyList();
+ } else {
+ return
Collections.singletonList(fetchResultsRowFormatQueryParameter);
+ }
}
}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java
index ccdaee00731..20d24b4aad0 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java
@@ -18,57 +18,35 @@
package org.apache.flink.table.gateway.rest.message.statement;
+import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import
org.apache.flink.table.gateway.rest.serde.FetchResultResponseBodyDeserializer;
+import
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer;
import org.apache.flink.table.gateway.rest.serde.ResultInfo;
-import org.apache.flink.table.gateway.rest.serde.ResultInfoJsonDeserializer;
-import org.apache.flink.table.gateway.rest.serde.ResultInfoJsonSerializer;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
import javax.annotation.Nullable;
/** {@link ResponseBody} for executing a statement. */
-public class FetchResultsResponseBody implements ResponseBody {
+@JsonSerialize(using = FetchResultsResponseBodySerializer.class)
+@JsonDeserialize(using = FetchResultResponseBodyDeserializer.class)
+public interface FetchResultsResponseBody extends ResponseBody {
- private static final String FIELD_RESULT_TYPE = "resultType";
- private static final String FIELD_RESULTS = "results";
- private static final String FIELD_NEXT_RESULT_URI = "nextResultUri";
+ ResultInfo getResults();
- @JsonProperty(FIELD_RESULTS)
- @JsonSerialize(using = ResultInfoJsonSerializer.class)
- @JsonDeserialize(using = ResultInfoJsonDeserializer.class)
- private final ResultInfo results;
+ ResultSet.ResultType getResultType();
- @JsonProperty(FIELD_RESULT_TYPE)
- private final String resultType;
-
- @JsonProperty(FIELD_NEXT_RESULT_URI)
@Nullable
- private final String nextResultUri;
-
- @JsonCreator
- public FetchResultsResponseBody(
- @JsonProperty(FIELD_RESULTS) ResultInfo results,
- @JsonProperty(FIELD_RESULT_TYPE) String resultType,
- @Nullable @JsonProperty(FIELD_NEXT_RESULT_URI) String
nextResultUri) {
- this.results = results;
- this.resultType = resultType;
- this.nextResultUri = nextResultUri;
- }
+ String getNextResultUri();
- public ResultInfo getResults() {
- return results;
- }
-
- public String getResultType() {
- return resultType;
- }
+ boolean isQueryResult();
@Nullable
- public String getNextResultUri() {
- return nextResultUri;
- }
+ JobID getJobID();
+
+ ResultKind getResultKind();
}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsRowFormatQueryParameter.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsRowFormatQueryParameter.java
new file mode 100644
index 00000000000..8ee20160937
--- /dev/null
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsRowFormatQueryParameter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.statement;
+
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
+
+/**
+ * A {@link MessageQueryParameter} that parses the 'rowFormat' query parameter
for fetching results.
+ */
+public class FetchResultsRowFormatQueryParameter extends
MessageQueryParameter<RowFormat> {
+
+ public static final String KEY = "rowFormat";
+
+ public FetchResultsRowFormatQueryParameter() {
+ super(KEY, MessageParameterRequisiteness.MANDATORY);
+ }
+
+ @Override
+ public RowFormat convertStringToValue(String value) {
+ return RowFormat.valueOf(value.toUpperCase());
+ }
+
+ @Override
+ public String convertValueToString(RowFormat value) {
+ return value.name();
+ }
+
+ @Override
+ public String getDescription() {
+ return "The row format to serialize the RowData.";
+ }
+}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/NotReadyFetchResultResponse.java
similarity index 50%
copy from
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
copy to
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/NotReadyFetchResultResponse.java
index e56b6024c02..0eda21724e1 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/NotReadyFetchResultResponse.java
@@ -16,60 +16,58 @@
* limitations under the License.
*/
-package org.apache.flink.table.gateway.service.result;
+package org.apache.flink.table.gateway.rest.message.statement;
import org.apache.flink.api.common.JobID;
import org.apache.flink.table.api.ResultKind;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.rest.serde.ResultInfo;
-import java.util.Collections;
-import java.util.List;
+import javax.annotation.Nullable;
-/** To represent that the execution result is not ready to fetch. */
-public class NotReadyResult implements ResultSet {
+/** The {@link FetchResultsResponseBody} indicates the results is not ready. */
+public class NotReadyFetchResultResponse implements FetchResultsResponseBody {
- public static final NotReadyResult NOT_READY_RESULT = new NotReadyResult();
+ private final String nextResultUri;
- private NotReadyResult() {}
-
- @Override
- public ResultType getResultType() {
- return ResultType.NOT_READY;
+ public NotReadyFetchResultResponse(String nextResultUri) {
+ this.nextResultUri = nextResultUri;
}
@Override
- public Long getNextToken() {
- return 0L;
+ public ResultInfo getResults() {
+ throw new SqlGatewayException(
+ "The result is not ready. Please fetch results until the
result type is PAYLOAD or EOS.");
}
@Override
- public ResolvedSchema getResultSchema() {
- throw new UnsupportedOperationException(
- "Don't know the schema for the result. Please continue
fetching results until the result type is PAYLOAD or EOS.");
+ public ResultSet.ResultType getResultType() {
+ return ResultSet.ResultType.NOT_READY;
}
+ @Nullable
@Override
- public List<RowData> getData() {
- return Collections.emptyList();
+ public String getNextResultUri() {
+ return nextResultUri;
}
@Override
public boolean isQueryResult() {
- throw new UnsupportedOperationException(
+ throw new SqlGatewayException(
"Don't know whether a NOT_READY_RESULT is for a query. Please
continue fetching results until the result type is PAYLOAD or EOS.");
}
+ @Nullable
@Override
public JobID getJobID() {
- throw new UnsupportedOperationException(
- "Can't get job ID from a NOT_READY_RESULT. Please continue
fetching results until the result type is PAYLOAD or EOS.");
+ throw new SqlGatewayException(
+ "Don't know the Job ID with NOT_READY_RESULT. Please continue
fetching results until the result type is PAYLOAD or EOS.");
}
@Override
public ResultKind getResultKind() {
- throw new UnsupportedOperationException(
- "Can't get result kind from a NOT_READY_RESULT. Please
continue fetching results until the result type is PAYLOAD or EOS.");
+ throw new SqlGatewayException(
+ "Don't know the ResultKind with NOT_READY_RESULT. Please
continue fetching results until the result type is PAYLOAD or EOS.");
}
}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/FetchResultResponseBodyDeserializer.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/FetchResultResponseBodyDeserializer.java
new file mode 100644
index 00000000000..792bfc6b1a1
--- /dev/null
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/FetchResultResponseBodyDeserializer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.serde;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import
org.apache.flink.table.gateway.rest.message.statement.FetchResultResponseBodyImpl;
+import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import
org.apache.flink.table.gateway.rest.message.statement.NotReadyFetchResultResponse;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+import static
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer.FIELD_IS_QUERY_RESULT;
+import static
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer.FIELD_JOB_ID;
+import static
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer.FIELD_NEXT_RESULT_URI;
+import static
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer.FIELD_RESULTS;
+import static
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer.FIELD_RESULT_KIND;
+import static
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer.FIELD_RESULT_TYPE;
+
+/** Deserializer to deserialize {@link FetchResultsResponseBody}. */
+public class FetchResultResponseBodyDeserializer extends
StdDeserializer<FetchResultsResponseBody> {
+
+ private static final long serialVersionUID = 1L;
+
+ protected FetchResultResponseBodyDeserializer() {
+ super(FetchResultsResponseBody.class);
+ }
+
+ @Override
+ public FetchResultsResponseBody deserialize(
+ JsonParser jsonParser, DeserializationContext context) throws
IOException {
+ final JsonNode jsonNode = jsonParser.readValueAsTree();
+
+ ResultSet.ResultType resultType = getResultType(jsonNode);
+ if (resultType == ResultSet.ResultType.NOT_READY) {
+ return new NotReadyFetchResultResponse(
+ jsonNode.required(FIELD_NEXT_RESULT_URI).asText());
+ }
+
+ boolean isQuery = jsonNode.required(FIELD_IS_QUERY_RESULT).asBoolean();
+ ResultInfo result =
+ context.readValue(
+
jsonNode.required(FIELD_RESULTS).traverse(jsonParser.getCodec()),
+ ResultInfo.class);
+ ResultKind resultKind =
ResultKind.valueOf(jsonNode.required(FIELD_RESULT_KIND).asText());
+ JobID jobID =
+ jsonNode.has(FIELD_JOB_ID)
+ ?
JobID.fromHexString(jsonNode.get(FIELD_JOB_ID).asText())
+ : null;
+ return new FetchResultResponseBodyImpl(
+ resultType,
+ isQuery,
+ jobID,
+ resultKind,
+ result,
+ jsonNode.has(FIELD_NEXT_RESULT_URI)
+ ? jsonNode.get(FIELD_NEXT_RESULT_URI).asText()
+ : null);
+ }
+
+ private ResultSet.ResultType getResultType(JsonNode serialized) throws
IOException {
+ if (!serialized.has(FIELD_RESULT_TYPE)) {
+ throw new IOException(
+ String.format("Can not find the required field %s.",
FIELD_RESULT_TYPE));
+ }
+ return
ResultSet.ResultType.valueOf(serialized.get(FIELD_RESULT_TYPE).asText());
+ }
+}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/FetchResultsResponseBodySerializer.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/FetchResultsResponseBodySerializer.java
new file mode 100644
index 00000000000..5972031ccb7
--- /dev/null
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/FetchResultsResponseBodySerializer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.serde;
+
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/** Serializer to serialize {@link FetchResultsResponseBody}. */
+public class FetchResultsResponseBodySerializer extends
StdSerializer<FetchResultsResponseBody> {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String FIELD_RESULT_TYPE = "resultType";
+ public static final String FIELD_IS_QUERY_RESULT = "isQueryResult";
+ public static final String FIELD_JOB_ID = "jobID";
+ public static final String FIELD_RESULT_KIND = "resultKind";
+ public static final String FIELD_RESULTS = "results";
+ public static final String FIELD_NEXT_RESULT_URI = "nextResultUri";
+
+ public FetchResultsResponseBodySerializer() {
+ super(FetchResultsResponseBody.class);
+ }
+
+ @Override
+ public void serialize(
+ FetchResultsResponseBody value, JsonGenerator gen,
SerializerProvider provider)
+ throws IOException {
+
+ gen.writeStartObject();
+ provider.defaultSerializeField(FIELD_RESULT_TYPE,
value.getResultType(), gen);
+ if (value.getResultType() != ResultSet.ResultType.NOT_READY) {
+ // PAYLOAD or EOS
+ provider.defaultSerializeField(FIELD_IS_QUERY_RESULT,
value.isQueryResult(), gen);
+ if (value.getJobID() != null) {
+ provider.defaultSerializeField(FIELD_JOB_ID,
value.getJobID().toHexString(), gen);
+ }
+ provider.defaultSerializeField(FIELD_RESULT_KIND,
value.getResultKind(), gen);
+ provider.defaultSerializeField(FIELD_RESULTS, value.getResults(),
gen);
+ }
+ if (value.getNextResultUri() != null) {
+ gen.writeStringField(FIELD_NEXT_RESULT_URI,
value.getNextResultUri());
+ }
+ gen.writeEndObject();
+ }
+}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
index c7ec5b05ff9..c34fb84dced 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
@@ -20,75 +20,121 @@ package org.apache.flink.table.gateway.rest.serde;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.results.ResultSetImpl;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.util.Preconditions;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
+
/**
* A {@code ResultInfo} contains information of a {@link ResultSet}. It is
designed for transferring
* the information of ResultSet via REST. For its serialization and
deserialization, See:
*
- * <p>{@link ResultInfoJsonSerializer} and {@link ResultInfoJsonDeserializer}
+ * <p>{@link ResultInfoSerializer} and {@link ResultInfoDeserializer}
*/
@Internal
+@JsonSerialize(using = ResultInfoSerializer.class)
+@JsonDeserialize(using = ResultInfoDeserializer.class)
public class ResultInfo {
- // Columns
- public static final String FIELD_NAME_COLUMN_INFOS = "columns";
-
- // RowData
- public static final String FIELD_NAME_DATA = "data";
- public static final String FIELD_NAME_KIND = "kind";
- public static final String FIELD_NAME_FIELDS = "fields";
-
private final List<ColumnInfo> columnInfos;
private final List<RowData> data;
+ private final RowFormat rowFormat;
- public ResultInfo(List<ColumnInfo> columnInfos, List<RowData> data) {
+ ResultInfo(List<ColumnInfo> columnInfos, List<RowData> data, RowFormat
rowFormat) {
this.columnInfos = columnInfos;
this.data = data;
+ this.rowFormat = rowFormat;
}
- public static ResultInfo createResultInfo(ResultSet resultSet) {
+ public static ResultInfo createResultInfo(ResultSet resultSet, RowFormat
rowFormat) {
+ Preconditions.checkArgument(resultSet.getResultType() !=
ResultSet.ResultType.NOT_READY);
+ List<RowData> data = resultSet.getData();
+
+ switch (rowFormat) {
+ case JSON:
+ break;
+ case PLAIN_TEXT:
+ RowDataToStringConverter converter = ((ResultSetImpl)
resultSet).getConverter();
+ data =
+ data.stream()
+ .map(rowData -> convertToPlainText(rowData,
converter))
+ .collect(Collectors.toList());
+
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ String.format("Unsupported row format: %s.",
rowFormat));
+ }
+
return new ResultInfo(
resultSet.getResultSchema().getColumns().stream()
.map(ColumnInfo::toColumnInfo)
.collect(Collectors.toList()),
- resultSet.getData());
+ data,
+ rowFormat);
}
+ /** Get the column info of the data. */
public List<ColumnInfo> getColumnInfos() {
return Collections.unmodifiableList(columnInfos);
}
+ /** Get the data. */
public List<RowData> getData() {
return data;
}
- public List<RowData.FieldGetter> getFieldGetters() {
- List<LogicalType> columnTypes =
-
columnInfos.stream().map(ColumnInfo::getLogicalType).collect(Collectors.toList());
- return IntStream.range(0, columnTypes.size())
- .mapToObj(i -> RowData.createFieldGetter(columnTypes.get(i),
i))
- .collect(Collectors.toList());
+ /** Get the row format about the data. */
+ public RowFormat getRowFormat() {
+ return rowFormat;
}
+ /**
+ * Create the {@link FieldGetter} to get column value in the results.
+ *
+ * <p>With {@code JSON} format, it uses the {@link ResolvedSchema} to
build the getters.
+ * However, it uses {@link StringData}'s {@link FieldGetter} to get the
column values.
+ */
+ public List<FieldGetter> getFieldGetters() {
+ if (rowFormat == RowFormat.JSON) {
+ List<LogicalType> columnTypes =
+ columnInfos.stream()
+ .map(ColumnInfo::getLogicalType)
+ .collect(Collectors.toList());
+ return IntStream.range(0, columnTypes.size())
+ .mapToObj(i ->
RowData.createFieldGetter(columnTypes.get(i), i))
+ .collect(Collectors.toList());
+ } else {
+ return IntStream.range(0, columnInfos.size())
+ .mapToObj(i -> RowData.createFieldGetter(STRING_TYPE, i))
+ .collect(Collectors.toList());
+ }
+ }
+
+ /** Get the schemas of the results. */
public ResolvedSchema getResultSchema() {
return ResolvedSchema.of(
columnInfos.stream().map(ColumnInfo::toColumn).collect(Collectors.toList()));
}
- @Override
- public int hashCode() {
- return Objects.hash(columnInfos, data);
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -98,14 +144,38 @@ public class ResultInfo {
return false;
}
ResultInfo that = (ResultInfo) o;
- return Objects.equals(columnInfos, that.columnInfos) &&
Objects.equals(data, that.data);
+ return Objects.equals(columnInfos, that.columnInfos)
+ && Objects.equals(data, that.data)
+ && rowFormat == that.rowFormat;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(columnInfos, data, rowFormat);
}
@Override
public String toString() {
- return String.format(
- "ResultInfo{\n columnInfos=[%s],\n rows=[%s]\n}",
-
columnInfos.stream().map(Object::toString).collect(Collectors.joining(",")),
-
data.stream().map(Object::toString).collect(Collectors.joining(",")));
+ return "ResultInfo{"
+ + "columnInfos="
+ + columnInfos
+ + ", data="
+ + data
+ + ", rowFormat="
+ + rowFormat
+ + '}';
+ }
+
+ private static RowData convertToPlainText(RowData rowData,
RowDataToStringConverter converter) {
+ String[] plainTexts = converter.convert(rowData);
+ // The RowDataToStringConverter will convert null to a specific
string. Here reassign it to
+ // null and let the caller determine how to use it.
+ IntStream.range(0, rowData.getArity())
+ .filter(rowData::isNullAt)
+ .forEach(i -> plainTexts[i] = null);
+
+ return GenericRowData.ofKind(
+ rowData.getRowKind(),
+
Arrays.stream(plainTexts).map(StringData::fromString).toArray());
}
}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonDeserializer.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoDeserializer.java
similarity index 63%
rename from
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonDeserializer.java
rename to
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoDeserializer.java
index c842ebc6051..113c3268d7e 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonDeserializer.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoDeserializer.java
@@ -23,6 +23,8 @@ import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonToRowDataConverters;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CollectionUtil;
@@ -39,22 +41,23 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_COLUMN_INFOS;
-import static
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_DATA;
-import static
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_FIELDS;
-import static
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_KIND;
+import static
org.apache.flink.table.gateway.rest.serde.ResultInfoSerializer.FIELD_NAME_COLUMN_INFOS;
+import static
org.apache.flink.table.gateway.rest.serde.ResultInfoSerializer.FIELD_NAME_DATA;
+import static
org.apache.flink.table.gateway.rest.serde.ResultInfoSerializer.FIELD_NAME_FIELDS;
+import static
org.apache.flink.table.gateway.rest.serde.ResultInfoSerializer.FIELD_NAME_KIND;
+import static
org.apache.flink.table.gateway.rest.serde.ResultInfoSerializer.FIELD_NAME_ROW_FORMAT;
/**
- * Json deserializer for {@link ResultInfo}.
+ * Deserializer for {@link ResultInfo}.
*
- * @see ResultInfoJsonSerializer for the reverse operation.
+ * @see ResultInfoSerializer for the reverse operation.
*/
@Internal
-public class ResultInfoJsonDeserializer extends StdDeserializer<ResultInfo> {
+public class ResultInfoDeserializer extends StdDeserializer<ResultInfo> {
private static final long serialVersionUID = 1L;
- public ResultInfoJsonDeserializer() {
+ public ResultInfoDeserializer() {
super(ResultInfo.class);
}
@@ -74,27 +77,52 @@ public class ResultInfoJsonDeserializer extends
StdDeserializer<ResultInfo> {
.treeToValue(
node.get(FIELD_NAME_COLUMN_INFOS),
ColumnInfo[].class));
- // generate converters for all fields of each row
- List<JsonToRowDataConverters.JsonToRowDataConverter> converters =
- columnInfos.stream()
- .map(ColumnInfo::getLogicalType)
- .map(TO_ROW_DATA_CONVERTERS::createConverter)
- .collect(Collectors.toList());
+ // deserialize RowFormat
+ RowFormat rowFormat =
+
RowFormat.valueOf(node.get(FIELD_NAME_ROW_FORMAT).asText().toUpperCase());
// deserialize rows
- List<RowData> data = deserializeData((ArrayNode)
node.get(FIELD_NAME_DATA), converters);
+ List<RowData> data =
+ deserializeData((ArrayNode) node.get(FIELD_NAME_DATA),
columnInfos, rowFormat);
- return new ResultInfo(columnInfos, data);
+ return new ResultInfo(columnInfos, data, rowFormat);
}
private List<RowData> deserializeData(
- ArrayNode serializedRows,
- List<JsonToRowDataConverters.JsonToRowDataConverter> converters) {
+ ArrayNode serializedRows, List<ColumnInfo> columnInfos, RowFormat
rowFormat) {
+ // generate converters for all fields of each row
+ List<JsonToRowDataConverters.JsonToRowDataConverter> converters =
+ buildToRowDataConverters(columnInfos, rowFormat);
+
List<RowData> data = new ArrayList<>();
serializedRows.forEach(rowDataNode ->
data.add(convertToRowData(rowDataNode, converters)));
return data;
}
+ private List<JsonToRowDataConverters.JsonToRowDataConverter>
buildToRowDataConverters(
+ List<ColumnInfo> columnInfos, RowFormat rowFormat) {
+ if (rowFormat == RowFormat.JSON) {
+ return columnInfos.stream()
+ .map(ColumnInfo::getLogicalType)
+ .map(TO_ROW_DATA_CONVERTERS::createConverter)
+ .collect(Collectors.toList());
+ } else if (rowFormat == RowFormat.PLAIN_TEXT) {
+ return IntStream.range(0, columnInfos.size())
+ .mapToObj(
+ i ->
+
(JsonToRowDataConverters.JsonToRowDataConverter)
+ jsonNode ->
+ jsonNode.isNull()
+ ? null
+ :
StringData.fromString(
+
jsonNode.asText()))
+ .collect(Collectors.toList());
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Unknown row format: %s.", rowFormat));
+ }
+ }
+
private GenericRowData convertToRowData(
JsonNode serializedRow,
List<JsonToRowDataConverters.JsonToRowDataConverter> converters) {
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerializer.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
similarity index 57%
rename from
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerializer.java
rename to
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
index 762e634b39f..b796d32ba3e 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerializer.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
@@ -23,6 +23,7 @@ import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.RowDataToJsonConverters;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -38,29 +39,36 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_COLUMN_INFOS;
-import static
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_DATA;
-import static
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_FIELDS;
-import static
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_KIND;
-
/**
- * Json serializer for {@link ResultInfo}.
+ * Serializer for {@link ResultInfo}.
*
- * @see ResultInfoJsonDeserializer for the reverse operation.
+ * @see ResultInfoDeserializer for the reverse operation.
*/
@Internal
-public class ResultInfoJsonSerializer extends StdSerializer<ResultInfo> {
+public class ResultInfoSerializer extends StdSerializer<ResultInfo> {
+
+ // Columns
+ public static final String FIELD_NAME_COLUMN_INFOS = "columns";
+
+ // RowData
+ public static final String FIELD_NAME_DATA = "data";
+ public static final String FIELD_NAME_KIND = "kind";
+ public static final String FIELD_NAME_FIELDS = "fields";
+
+ // RowFormat
+ public static final String FIELD_NAME_ROW_FORMAT = "rowFormat";
private static final long serialVersionUID = 1L;
- public ResultInfoJsonSerializer() {
+ public ResultInfoSerializer() {
super(ResultInfo.class);
}
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
private static final RowDataToJsonConverters TO_JSON_CONVERTERS =
new RowDataToJsonConverters(
- TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL, "");
+ TimestampFormat.ISO_8601,
JsonFormatOptions.MapNullKeyMode.LITERAL, "null");
@Override
public void serialize(
@@ -74,8 +82,12 @@ public class ResultInfoJsonSerializer extends
StdSerializer<ResultInfo> {
serializerProvider.defaultSerializeField(
FIELD_NAME_COLUMN_INFOS, resultInfo.getColumnInfos(),
jsonGenerator);
+ // serialize RowFormat
+ serializerProvider.defaultSerializeField(
+ FIELD_NAME_ROW_FORMAT, resultInfo.getRowFormat(),
jsonGenerator);
+
// serialize data
- serializeData(resultInfo.getData(), buildToJsonConverters(resultInfo),
jsonGenerator);
+ serializeData(resultInfo.getData(),
buildRowDataConverters(resultInfo), jsonGenerator);
jsonGenerator.writeEndObject();
}
@@ -110,29 +122,49 @@ public class ResultInfoJsonSerializer extends
StdSerializer<ResultInfo> {
return serializedRowData;
}
- /** Composes the FieldGetter and RowDataToJsonConverter. */
- private List<Function<RowData, JsonNode>> buildToJsonConverters(ResultInfo
resultInfo) {
- List<RowDataToJsonConverters.RowDataToJsonConverter> converters =
- resultInfo.getColumnInfos().stream()
- .map(ColumnInfo::getLogicalType)
- .map(TO_JSON_CONVERTERS::createConverter)
- .collect(Collectors.toList());
-
+ private List<Function<RowData, JsonNode>>
buildRowDataConverters(ResultInfo resultInfo) {
+ RowFormat rowFormat = resultInfo.getRowFormat();
List<RowData.FieldGetter> fieldGetters = resultInfo.getFieldGetters();
-
- return IntStream.range(0, converters.size())
- .mapToObj(
- i ->
- (Function<RowData, JsonNode>)
- rowData ->
- converters
- .get(i)
- .convert(
- OBJECT_MAPPER,
- null,
- fieldGetters
- .get(i)
-
.getFieldOrNull(rowData)))
- .collect(Collectors.toList());
+ if (rowFormat == RowFormat.JSON) {
+ List<RowDataToJsonConverters.RowDataToJsonConverter> converters =
+ resultInfo.getColumnInfos().stream()
+ .map(ColumnInfo::getLogicalType)
+ .map(TO_JSON_CONVERTERS::createConverter)
+ .collect(Collectors.toList());
+
+ return IntStream.range(0, converters.size())
+ .mapToObj(
+ i ->
+ (Function<RowData, JsonNode>)
+ rowData ->
+ converters
+ .get(i)
+ .convert(
+
OBJECT_MAPPER,
+ null,
+
fieldGetters
+
.get(i)
+
.getFieldOrNull(
+
rowData)))
+ .collect(Collectors.toList());
+ } else if (rowFormat == RowFormat.PLAIN_TEXT) {
+ return IntStream.range(0, resultInfo.getColumnInfos().size())
+ .mapToObj(
+ i ->
+ (Function<RowData, JsonNode>)
+ rowData -> {
+ Object value =
+
fieldGetters.get(i).getFieldOrNull(rowData);
+ return value == null
+ ?
OBJECT_MAPPER.getNodeFactory().nullNode()
+ : OBJECT_MAPPER
+
.getNodeFactory()
+
.textNode(value.toString());
+ })
+ .collect(Collectors.toList());
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Unknown row format: %s.", rowFormat));
+ }
}
}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RowFormat.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RowFormat.java
new file mode 100644
index 00000000000..3e931935d89
--- /dev/null
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RowFormat.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+
+/** Describes the serialization format of {@link RowData} in the {@link
ResultSet}. */
+@PublicEvolving
+public enum RowFormat {
+ /**
+ * Serialize the RowData with JSON format. The serialized value can be
deserialized by means of
+ * structural expressions.
+ */
+ JSON,
+
+ /** Serialize the RowData to SQL-compliant, plain strings. */
+ PLAIN_TEXT
+}
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index dc088fae5b1..4e16cb5dd6c 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -27,6 +27,7 @@ import
org.apache.flink.table.gateway.api.results.FetchOrientation;
import org.apache.flink.table.gateway.api.results.OperationInfo;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.NotReadyResult;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
@@ -45,8 +46,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Supplier;
-import static
org.apache.flink.table.gateway.service.result.NotReadyResult.NOT_READY_RESULT;
-
/** Manager for the {@link Operation}. */
@Internal
public class OperationManager {
@@ -348,7 +347,7 @@ public class OperationManager {
} else if (currentStatus == OperationStatus.RUNNING
|| currentStatus == OperationStatus.PENDING
|| currentStatus == OperationStatus.INITIALIZED) {
- return NOT_READY_RESULT;
+ return NotReadyResult.INSTANCE;
} else {
throw new SqlGatewayException(
String.format(
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
index e56b6024c02..45e61f9295e 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
@@ -30,7 +30,7 @@ import java.util.List;
/** To represent that the execution result is not ready to fetch. */
public class NotReadyResult implements ResultSet {
- public static final NotReadyResult NOT_READY_RESULT = new NotReadyResult();
+ public static final NotReadyResult INSTANCE = new NotReadyResult();
private NotReadyResult() {}
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
index 463b567518b..f14ea52468c 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.gateway;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.service.utils.Constants;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.table.gateway.utils.SqlScriptReader;
import org.apache.flink.table.gateway.utils.TestSqlStatement;
@@ -28,16 +29,19 @@ import org.apache.flink.table.utils.print.PrintStyle;
import org.apache.flink.table.utils.print.RowDataToStringConverter;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -59,7 +63,7 @@ import java.util.Map;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.regex.Pattern;
-import java.util.stream.Stream;
+import java.util.stream.Collectors;
import static
org.apache.flink.table.gateway.utils.SqlScriptReader.HINT_START_OF_OUTPUT;
import static
org.apache.flink.table.planner.utils.TableTestUtil.replaceNodeIdInOperator;
@@ -68,6 +72,7 @@ import static
org.apache.flink.util.Preconditions.checkNotNull;
import static org.assertj.core.api.Assertions.assertThat;
/** Base ITCase tests for statements. */
+@ExtendWith(ParameterizedTestExtension.class)
public abstract class AbstractSqlGatewayStatementITCase extends
AbstractTestBase {
private static final Logger LOG =
@@ -89,6 +94,13 @@ public abstract class AbstractSqlGatewayStatementITCase
extends AbstractTestBase
private final Map<String, String> replaceVars = new HashMap<>();
+ @Parameter public TestParameters parameters;
+
+ @Parameters(name = "parameters={0}")
+ public static List<TestParameters> parameters() throws Exception {
+ return
listFlinkSqlTests().stream().map(TestParameters::new).collect(Collectors.toList());
+ }
+
@BeforeAll
public static void setUp() {
service = SQL_GATEWAY_SERVICE_EXTENSION.getService();
@@ -111,11 +123,10 @@ public abstract class AbstractSqlGatewayStatementITCase
extends AbstractTestBase
Files.createDirectory(temporaryFolder.resolve("batch_ctas")).toFile().getPath());
}
- @ParameterizedTest
- @MethodSource("listFlinkSqlTests")
- public void testFlinkSqlStatements(String sqlPath) throws Exception {
- resetSessionForFlinkSqlStatements();
- runTest(sqlPath);
+ @TestTemplate
+ public void testFlinkSqlStatements() throws Exception {
+ prepareEnvironment();
+ runTest(parameters.getSqlPath());
}
/**
@@ -153,6 +164,25 @@ public abstract class AbstractSqlGatewayStatementITCase
extends AbstractTestBase
// Utility
//
-------------------------------------------------------------------------------------------
+ /** Parameters of the test spec. */
+ protected static class TestParameters {
+
+ protected final String sqlPath;
+
+ public TestParameters(String sqlPath) {
+ this.sqlPath = sqlPath;
+ }
+
+ public String getSqlPath() {
+ return sqlPath;
+ }
+
+ @Override
+ public String toString() {
+ return "TestParameters{" + "sqlPath='" + sqlPath + '\'' + '}';
+ }
+ }
+
/** Mark the output type. */
public enum Tag {
INFO("!info"),
@@ -222,7 +252,7 @@ public abstract class AbstractSqlGatewayStatementITCase
extends AbstractTestBase
values);
}
- private static Stream<String> listFlinkSqlTests() throws Exception {
+ protected static List<String> listFlinkSqlTests() throws Exception {
final File jarFile =
new File(
AbstractSqlGatewayStatementITCase.class
@@ -244,14 +274,13 @@ public abstract class AbstractSqlGatewayStatementITCase
extends AbstractTestBase
}
}
}
- return files.stream();
+ return files;
} else {
return listTestSpecInTheSameModule(RESOURCE_DIR);
}
}
- protected static Stream<String> listTestSpecInTheSameModule(String
resourceDir)
- throws Exception {
+ protected static List<String> listTestSpecInTheSameModule(String
resourceDir) throws Exception {
return IOUtils.readLines(
checkNotNull(
AbstractSqlGatewayStatementITCase.class
@@ -259,7 +288,8 @@ public abstract class AbstractSqlGatewayStatementITCase
extends AbstractTestBase
.getResourceAsStream(resourceDir)),
StandardCharsets.UTF_8)
.stream()
- .map(name -> Paths.get(resourceDir, name).toString());
+ .map(name -> Paths.get(resourceDir, name).toString())
+ .collect(Collectors.toList());
}
protected void runTest(String sqlPath) throws Exception {
@@ -269,7 +299,7 @@ public abstract class AbstractSqlGatewayStatementITCase
extends AbstractTestBase
assertThat(String.join("",
runStatements(testSqlStatements))).isEqualTo(in);
}
- protected void resetSessionForFlinkSqlStatements() throws Exception {}
+ protected void prepareEnvironment() throws Exception {}
/**
* Returns printed results for each ran SQL statements.
@@ -295,6 +325,11 @@ public abstract class AbstractSqlGatewayStatementITCase
extends AbstractTestBase
replaceNodeIdInOperator(
iterator.next().getString(0).toString()))
+ "\n");
+ } else if (schema.getColumn(0)
+ .map(col -> col.getName().equals(Constants.JOB_ID))
+ .orElse(false)) {
+ // ignore output of the job id
+ return Tag.INFO.addTag("Job ID:\n");
} else {
ByteArrayOutputStream outContent = new ByteArrayOutputStream();
PrintStyle style =
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
index 47c26b4f543..de6a3519b72 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
@@ -33,6 +33,7 @@ import
org.apache.flink.table.gateway.rest.message.operation.OperationMessagePar
import
org.apache.flink.table.gateway.rest.message.operation.OperationStatusResponseBody;
import
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
import
org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
+import org.apache.flink.table.gateway.service.result.NotReadyResult;
import org.junit.jupiter.api.Test;
@@ -44,7 +45,6 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import static
org.apache.flink.table.gateway.service.result.NotReadyResult.NOT_READY_RESULT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -129,7 +129,7 @@ class OperationRelatedITCase extends RestAPIITCaseBase {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ignored) {
}
- return NOT_READY_RESULT;
+ return NotReadyResult.INSTANCE;
});
assertThat(operationHandle).isNotNull();
return Arrays.asList(sessionHandleId,
operationHandle.getIdentifier().toString());
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
index 2970d2f25a1..fb27468392a 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
@@ -40,8 +40,8 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
-import static
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
-import static
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getBaseConfig;
+import static
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getFlinkConfig;
import static
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
import static org.apache.flink.util.Preconditions.checkNotNull;
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
index e52a42ee354..dac0519a8b2 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
@@ -70,8 +70,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
-import static
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getBaseConfig;
+import static
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getFlinkConfig;
import static
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
index 4fc378e9029..f87558f171a 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
@@ -37,13 +37,16 @@ import
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
import
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
import
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
import
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
+import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
-import
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
import org.apache.flink.table.gateway.rest.serde.ResultInfo;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
import
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils;
import org.apache.flink.table.gateway.rest.util.TestingRestClient;
import
org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -51,6 +54,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.time.Duration;
@@ -58,9 +63,13 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import static
org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
import static
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -70,7 +79,10 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
* Test basic logic of handlers inherited from {@link
AbstractSqlGatewayRestHandler} in statement
* related cases.
*/
-class SqlGatewayRestEndpointStatementITCase extends
AbstractSqlGatewayStatementITCase {
+public class SqlGatewayRestEndpointStatementITCase extends
AbstractSqlGatewayStatementITCase {
+
+ private static final Logger LOG =
+
LoggerFactory.getLogger(SqlGatewayRestEndpointStatementITCase.class);
@RegisterExtension
@Order(3)
@@ -82,7 +94,7 @@ class SqlGatewayRestEndpointStatementITCase extends
AbstractSqlGatewayStatementI
ExecuteStatementHeaders.getInstance();
private static SessionMessageParameters sessionMessageParameters;
private static final FetchResultsHeaders fetchResultsHeaders =
- FetchResultsHeaders.getInstance();
+ FetchResultsHeaders.getDefaultInstance();
private static final int OPERATION_WAIT_SECONDS = 100;
private static final String PATTERN1 = "Caused by: ";
@@ -105,6 +117,17 @@ class SqlGatewayRestEndpointStatementITCase extends
AbstractSqlGatewayStatementI
restClient.shutdown();
}
+ @Parameters(name = "parameters={0}")
+ public static List<TestParameters> parameters() throws Exception {
+ return listFlinkSqlTests().stream()
+ .flatMap(
+ path ->
+ Stream.of(
+ new RestTestParameters(path,
RowFormat.JSON),
+ new RestTestParameters(path,
RowFormat.PLAIN_TEXT)))
+ .collect(Collectors.toList());
+ }
+
@BeforeEach
@Override
public void before(@TempDir Path temporaryFolder) throws Exception {
@@ -153,11 +176,8 @@ class SqlGatewayRestEndpointStatementITCase extends
AbstractSqlGatewayStatementI
ResultInfo resultInfo = fetchResultsResponseBody.getResults();
assertThat(resultInfo).isNotNull();
- String resultType = fetchResultsResponseBody.getResultType();
- assertThat(
- Arrays.asList(
- ResultSet.ResultType.PAYLOAD.name(),
- ResultSet.ResultType.EOS.name()))
+ ResultSet.ResultType resultType =
fetchResultsResponseBody.getResultType();
+ assertThat(Arrays.asList(ResultSet.ResultType.PAYLOAD,
ResultSet.ResultType.EOS))
.contains(resultType);
ResolvedSchema resultSchema = resultInfo.getResultSchema();
@@ -165,25 +185,31 @@ class SqlGatewayRestEndpointStatementITCase extends
AbstractSqlGatewayStatementI
return toString(
StatementType.match(statement),
resultSchema,
- new RowDataToStringConverterImpl(
- resultSchema.toPhysicalRowDataType(),
- DateTimeUtils.UTC_ZONE.toZoneId(),
-
SqlGatewayRestEndpointStatementITCase.class.getClassLoader(),
- false),
+ ((RestTestParameters) parameters).getRowFormat() ==
RowFormat.JSON
+ ? new RowDataToStringConverterImpl(
+ resultSchema.toPhysicalRowDataType(),
+ DateTimeUtils.UTC_ZONE.toZoneId(),
+
SqlGatewayRestEndpointStatementITCase.class.getClassLoader(),
+ false)
+ : SIMPLE_ROW_DATA_TO_STRING_CONVERTER,
new RowDataIterator(sessionHandle, operationHandle));
}
FetchResultsResponseBody fetchResults(
SessionHandle sessionHandle, OperationHandle operationHandle, Long
token)
throws Exception {
- FetchResultsTokenParameters fetchResultsTokenParameters =
- new FetchResultsTokenParameters(sessionHandle,
operationHandle, token);
+ FetchResultsMessageParameters fetchResultsMessageParameters =
+ new FetchResultsMessageParameters(
+ sessionHandle,
+ operationHandle,
+ token,
+ ((RestTestParameters) parameters).getRowFormat());
CompletableFuture<FetchResultsResponseBody> response =
restClient.sendRequest(
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(),
fetchResultsHeaders,
- fetchResultsTokenParameters,
+ fetchResultsMessageParameters,
EmptyRequestBody.getInstance());
return response.get();
}
@@ -202,6 +228,31 @@ class SqlGatewayRestEndpointStatementITCase extends
AbstractSqlGatewayStatementI
.equals(RuntimeExecutionMode.STREAMING);
}
+ private static class RestTestParameters extends TestParameters {
+
+ private final RowFormat rowFormat;
+
+ public RestTestParameters(String sqlPath, RowFormat rowFormat) {
+ super(sqlPath);
+ this.rowFormat = rowFormat;
+ }
+
+ public RowFormat getRowFormat() {
+ return rowFormat;
+ }
+
+ @Override
+ public String toString() {
+ return "RestTestParameters{"
+ + "sqlPath='"
+ + sqlPath
+ + '\''
+ + ", rowFormat="
+ + rowFormat
+ + '}';
+ }
+ }
+
private class RowDataIterator implements Iterator<RowData> {
private final SessionHandle sessionHandle;
@@ -223,10 +274,11 @@ class SqlGatewayRestEndpointStatementITCase extends
AbstractSqlGatewayStatementI
try {
fetch();
} catch (Exception ignored) {
+ LOG.error("Failed to fetch results.", ignored);
}
}
- return token != null;
+ return fetchedRows.hasNext();
}
@Override
@@ -238,18 +290,10 @@ class SqlGatewayRestEndpointStatementITCase extends
AbstractSqlGatewayStatementI
FetchResultsResponseBody fetchResultsResponseBody =
fetchResults(sessionHandle, operationHandle, token);
- String nextResultUri = fetchResultsResponseBody.getNextResultUri();
- token = parseTokenFromUri(nextResultUri);
-
+ token =
+ SqlGatewayRestEndpointTestUtils.parseToken(
+ fetchResultsResponseBody.getNextResultUri());
fetchedRows =
fetchResultsResponseBody.getResults().getData().iterator();
}
}
-
- private static Long parseTokenFromUri(String uri) {
- if (uri == null || uri.length() == 0) {
- return null;
- }
- String[] split = uri.split("/");
- return Long.valueOf(split[split.length - 1]);
- }
}
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointTest.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointTest.java
index 0fa3389317f..e1410b8b8de 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointTest.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointTest.java
@@ -30,8 +30,8 @@ import org.junit.jupiter.api.Test;
import static
org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getEndpointConfig;
import static
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory.IDENTIFIER;
-import static
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
-import static
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getSqlGatewayRestOptionFullName;
+import static
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getBaseConfig;
+import static
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getSqlGatewayRestOptionFullName;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/StatementRelatedITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/StatementRelatedITCase.java
index 0fe21bf4cf1..5efeb7d0a79 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/StatementRelatedITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/StatementRelatedITCase.java
@@ -29,8 +29,11 @@ import
org.apache.flink.table.gateway.rest.message.session.SessionMessageParamet
import
org.apache.flink.table.gateway.rest.message.statement.CompleteStatementRequestBody;
import
org.apache.flink.table.gateway.rest.message.statement.CompleteStatementResponseBody;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import java.nio.file.Path;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -43,20 +46,25 @@ import static org.assertj.core.api.Assertions.assertThat;
*/
public class StatementRelatedITCase extends RestAPIITCaseBase {
- @Test
- public void testCompleteStatement() throws Exception {
+ private SessionHandle sessionHandle;
+ private SessionMessageParameters sessionMessageParameters;
+ private @TempDir Path tempDir;
+
+ @BeforeEach
+ public void setUp() throws Exception {
CompletableFuture<OpenSessionResponseBody> response =
sendRequest(
OpenSessionHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
new OpenSessionRequestBody(null, null));
- SessionHandle sessionHandle =
- new
SessionHandle(UUID.fromString(response.get().getSessionHandle()));
+ sessionHandle = new
SessionHandle(UUID.fromString(response.get().getSessionHandle()));
- SessionMessageParameters sessionMessageParameters =
- new SessionMessageParameters(sessionHandle);
+ sessionMessageParameters = new SessionMessageParameters(sessionHandle);
+ }
+ @Test
+ public void testCompleteStatement() throws Exception {
CompletableFuture<CompleteStatementResponseBody>
completeStatementResponse =
sendRequest(
CompleteStatementHeaders.getINSTANCE(),
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java
index fe86784f78c..5cb765be07e 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java
@@ -22,8 +22,13 @@ import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
+import
org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
@@ -33,7 +38,9 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.Si
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
import java.math.BigDecimal;
@@ -45,7 +52,6 @@ import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -54,6 +60,7 @@ import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import static org.apache.flink.table.api.DataTypes.ARRAY;
import static org.apache.flink.table.api.DataTypes.BIGINT;
@@ -76,7 +83,7 @@ import static
org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZON
import static org.apache.flink.table.api.DataTypes.TINYINT;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link ResultInfoJsonSerializer} and {@link
ResultInfoJsonDeserializer}. */
+/** Tests for {@link ResultInfoSerializer} and {@link ResultInfoDeserializer}.
*/
public class ResultInfoJsonSerDeTest {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Row testRow = initRow();
@@ -84,69 +91,75 @@ public class ResultInfoJsonSerDeTest {
@BeforeAll
public static void setUp() {
SimpleModule simpleModule = new SimpleModule();
- simpleModule.addSerializer(ResultInfo.class, new
ResultInfoJsonSerializer());
- simpleModule.addDeserializer(ResultInfo.class, new
ResultInfoJsonDeserializer());
+ simpleModule.addSerializer(ResultInfo.class, new
ResultInfoSerializer());
+ simpleModule.addDeserializer(ResultInfo.class, new
ResultInfoDeserializer());
OBJECT_MAPPER.registerModule(simpleModule);
}
- @Test
- public void testResultInfoSerDeWithSingleRow() throws Exception {
- serDeTest(Collections.singletonList(testRow));
+ @ParameterizedTest
+ @EnumSource(RowFormat.class)
+ public void testResultInfoSerDeWithSingleRow(RowFormat rowFormat) throws
Exception {
+ serDeTest(Collections.singletonList(testRow), rowFormat);
}
- @Test
- public void testResultInfoSerDeWithMultiRowData() throws Exception {
- List<Row> rows = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- rows.add(testRow);
- }
- serDeTest(rows);
+ @ParameterizedTest
+ @EnumSource(RowFormat.class)
+ public void testResultInfoSerDeWithMultiRowData(RowFormat rowFormat)
throws Exception {
+ serDeTest(Collections.nCopies(10, testRow), rowFormat);
}
- @Test
- public void testResultInfoSerDeWithNullValues() throws Exception {
- List<Row> rows = new ArrayList<>();
- List<Integer> positions = new ArrayList<>();
- for (int i = 0; i < 18; i++) {
- positions.add(new Random().nextInt(18));
- }
- for (int i = 0; i < 10; i++) {
- rows.add(getTestRowDataWithNullValues(initRow(), positions));
- }
- serDeTest(rows);
+ @ParameterizedTest
+ @EnumSource(RowFormat.class)
+ public void testResultInfoSerDeWithNullValues(RowFormat rowFormat) throws
Exception {
+ List<Integer> positions =
+ IntStream.range(0, 18)
+ .mapToObj(i -> new Random().nextInt(18))
+ .collect(Collectors.toList());
+
+ serDeTest(
+ Collections.nCopies(10,
getTestRowDataWithNullValues(initRow(), positions)),
+ rowFormat);
}
- @Test
- public void testDeserializationFromJson() throws Exception {
- URL url =
ResultInfoJsonSerDeTest.class.getClassLoader().getResource("resultInfo.txt");
+ @ParameterizedTest
+ @ValueSource(strings = {"result_info_json_format.txt",
"result_info_plain_text_format.txt"})
+ public void testDeserializationFromJson(String fileName) throws Exception {
+ URL url =
ResultInfoJsonSerDeTest.class.getClassLoader().getResource(fileName);
String input =
IOUtils.toString(Preconditions.checkNotNull(url),
StandardCharsets.UTF_8).trim();
ResultInfo deserializedResult = OBJECT_MAPPER.readValue(input,
ResultInfo.class);
assertThat(OBJECT_MAPPER.writeValueAsString(deserializedResult)).isEqualTo(input);
}
- private void serDeTest(List<Row> rows) throws IOException {
- List<RowData> rowDataList =
+ private void serDeTest(List<Row> rows, RowFormat rowFormat) throws
IOException {
+ List<RowData> rowDatas =
rows.stream().map(this::convertToInternal).collect(Collectors.toList());
+ if (rowFormat == RowFormat.PLAIN_TEXT) {
+ rowDatas =
+ rowDatas.stream()
+ .map(this::toPlainTextFormatRowData)
+ .collect(Collectors.toList());
+ }
+
ResolvedSchema testResolvedSchema = getTestResolvedSchema(getFields());
ResultInfo testResultInfo =
new ResultInfo(
testResolvedSchema.getColumns().stream()
.map(ColumnInfo::toColumnInfo)
.collect(Collectors.toList()),
- rowDataList);
+ rowDatas,
+ rowFormat);
// test serialization & deserialization
String result = OBJECT_MAPPER.writeValueAsString(testResultInfo);
ResultInfo resultInfo = OBJECT_MAPPER.readValue(result,
ResultInfo.class);
+ // validate schema
assertThat(resultInfo.getResultSchema().toString())
.isEqualTo(testResultInfo.getResultSchema().toString());
- List<RowData> data = resultInfo.getData();
- for (int i = 0; i < data.size(); i++) {
- assertThat(convertToExternal(data.get(i),
ROW(getFields()))).isEqualTo(rows.get(i));
- }
+ // validate data
+ assertDataWithFormat(resultInfo.getData(), rows, rowFormat);
}
private static Row initRow() {
@@ -269,4 +282,42 @@ public class ResultInfoJsonSerDeTest {
DataFormatConverters.getConverterForDataType(ROW(getFields()));
return converter.toInternal(row);
}
+
+ private RowData toPlainTextFormatRowData(RowData rowData) {
+ RowDataToStringConverter converter =
+ new RowDataToStringConverterImpl(
+
getTestResolvedSchema(getFields()).toPhysicalRowDataType(),
+ DateTimeUtils.UTC_ZONE.toZoneId(),
+ ResultInfoJsonSerDeTest.class.getClassLoader(),
+ false);
+
+ StringData[] plainText =
+ Arrays.stream(converter.convert(rowData))
+ .map(StringData::fromString)
+ .toArray(StringData[]::new);
+
+ return GenericRowData.ofKind(rowData.getRowKind(), (Object[])
plainText);
+ }
+
+ private void assertDataWithFormat(
+ List<RowData> expected, List<Row> actual, RowFormat rowFormat) {
+ if (rowFormat == RowFormat.JSON) {
+ for (int i = 0; i < expected.size(); i++) {
+ assertThat(convertToExternal(expected.get(i),
ROW(getFields())))
+ .isEqualTo(actual.get(i));
+ }
+ } else {
+ for (int i = 0; i < expected.size(); i++) {
+ assertPlainTextFormatData(
+ expected.get(i),
+
toPlainTextFormatRowData(convertToInternal(actual.get(i))));
+ }
+ }
+ }
+
+ private void assertPlainTextFormatData(RowData expected, RowData actual) {
+ for (int i = 0; i < expected.getArity(); i++) {
+ assertThat(expected.getString(i)).isEqualTo(actual.getString(i));
+ }
+ }
}
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java
index 4116669e756..2b69d5f68f7 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java
@@ -32,8 +32,8 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.function.Supplier;
-import static
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
-import static
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getBaseConfig;
+import static
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getFlinkConfig;
import static org.apache.flink.util.Preconditions.checkNotNull;
/** A simple {@link Extension} that manages the lifecycle of the {@link
SqlGatewayRestEndpoint}. */
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/RestConfigUtils.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
similarity index 85%
rename from
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/RestConfigUtils.java
rename to
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
index 09092e17180..177427b1523 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/RestConfigUtils.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
@@ -21,13 +21,15 @@ package org.apache.flink.table.gateway.rest.util;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils;
+import javax.annotation.Nullable;
+
import static
org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getEndpointConfig;
import static
org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getSqlGatewayOptionPrefix;
import static
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory.IDENTIFIER;
import static
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory.rebuildRestEndpointOptions;
/** The tools to get configuration in test cases. */
-public class RestConfigUtils {
+public class SqlGatewayRestEndpointTestUtils {
/** Get the full name of sql gateway rest endpoint options. */
public static String getSqlGatewayRestOptionFullName(String key) {
@@ -62,4 +64,16 @@ public class RestConfigUtils {
}
return config;
}
+
+ /** Parse token from the result uri. */
+ public static @Nullable Long parseToken(@Nullable String nextResultUri) {
+ if (nextResultUri == null || nextResultUri.length() == 0) {
+ return null;
+ }
+ String[] split = nextResultUri.split("/");
+ // remove query string
+ String s = split[split.length - 1];
+ s = s.replaceAll("\\?.*", "");
+ return Long.valueOf(s);
+ }
}
diff --git
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index cd9993e7fed..03d338fed83 100644
---
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -54,6 +54,7 @@ import
org.apache.flink.table.gateway.api.session.SessionHandle;
import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
import org.apache.flink.table.gateway.service.operation.OperationManager;
+import org.apache.flink.table.gateway.service.result.NotReadyResult;
import org.apache.flink.table.gateway.service.session.SessionManager;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
@@ -108,7 +109,6 @@ import static
org.apache.flink.table.functions.FunctionKind.AGGREGATE;
import static org.apache.flink.table.functions.FunctionKind.OTHER;
import static org.apache.flink.table.functions.FunctionKind.SCALAR;
import static
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
-import static
org.apache.flink.table.gateway.service.result.NotReadyResult.NOT_READY_RESULT;
import static
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination;
import static
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.createInitializedSession;
import static
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchResults;
@@ -288,7 +288,7 @@ public class SqlGatewayServiceITCase {
startRunningLatch.await();
assertThat(fetchResults(service, sessionHandle, operationHandle))
- .isEqualTo(NOT_READY_RESULT);
+ .isEqualTo(NotReadyResult.INSTANCE);
endRunningLatch.countDown();
}
diff --git a/flink-table/flink-sql-gateway/src/test/resources/resultInfo.txt
b/flink-table/flink-sql-gateway/src/test/resources/result_info_json_format.txt
similarity index 55%
rename from flink-table/flink-sql-gateway/src/test/resources/resultInfo.txt
rename to
flink-table/flink-sql-gateway/src/test/resources/result_info_json_format.txt
index 42b972a757e..486f9a54e66 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/resultInfo.txt
+++
b/flink-table/flink-sql-gateway/src/test/resources/result_info_json_format.txt
@@ -1 +1 @@
-{"columns":[{"name":"bool","logicalType":{"type":"BOOLEAN","nullable":true},"comment":null},{"name":"tinyint","logicalType":{"type":"TINYINT","nullable":true},"comment":null},{"name":"smallint","logicalType":{"type":"SMALLINT","nullable":true},"comment":null},{"name":"int","logicalType":{"type":"INTEGER","nullable":true},"comment":null},{"name":"bigint","logicalType":{"type":"BIGINT","nullable":true},"comment":null},{"name":"float","logicalType":{"type":"FLOAT","nullable":true},"comment"
[...]
+{"columns":[{"name":"bool","logicalType":{"type":"BOOLEAN","nullable":true},"comment":null},{"name":"tinyint","logicalType":{"type":"TINYINT","nullable":true},"comment":null},{"name":"smallint","logicalType":{"type":"SMALLINT","nullable":true},"comment":null},{"name":"int","logicalType":{"type":"INTEGER","nullable":true},"comment":null},{"name":"bigint","logicalType":{"type":"BIGINT","nullable":true},"comment":null},{"name":"float","logicalType":{"type":"FLOAT","nullable":true},"comment"
[...]
diff --git
a/flink-table/flink-sql-gateway/src/test/resources/result_info_plain_text_format.txt
b/flink-table/flink-sql-gateway/src/test/resources/result_info_plain_text_format.txt
new file mode 100644
index 00000000000..a640abfc9cc
--- /dev/null
+++
b/flink-table/flink-sql-gateway/src/test/resources/result_info_plain_text_format.txt
@@ -0,0 +1 @@
+{"columns":[{"name":"bool","logicalType":{"type":"BOOLEAN","nullable":true},"comment":null},{"name":"tinyint","logicalType":{"type":"TINYINT","nullable":true},"comment":null},{"name":"smallint","logicalType":{"type":"SMALLINT","nullable":true},"comment":null},{"name":"int","logicalType":{"type":"INTEGER","nullable":true},"comment":null},{"name":"bigint","logicalType":{"type":"BIGINT","nullable":true},"comment":null},{"name":"float","logicalType":{"type":"FLOAT","nullable":true},"comment"
[...]
diff --git
a/flink-table/flink-sql-gateway/src/test/resources/sql/begin_statement_set.q
b/flink-table/flink-sql-gateway/src/test/resources/sql/begin_statement_set.q
index 56479dc4826..a36e200a606 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/begin_statement_set.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/begin_statement_set.q
@@ -73,17 +73,6 @@ create table StreamingTable2 (
1 row in set
!ok
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = 'c6d2eb2ade68485fb4d1848294150861';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
-
BEGIN STATEMENT SET;
!output
+--------+
@@ -136,23 +125,8 @@ INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1,
'Hello World'), (2, 'Hi'),
END;
!output
-+----------------------------------+
-| job id |
-+----------------------------------+
-| c6d2eb2ade68485fb4d1848294150861 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET '$internal.pipeline.job-id';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
SELECT * FROM StreamingTable;
!output
@@ -217,17 +191,6 @@ create table BatchTable (
1 row in set
!ok
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = 'e66a9bbf66c04a41b4d687e26c8296a0';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
-
BEGIN STATEMENT SET;
!output
+--------+
@@ -260,23 +223,8 @@ INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello
World'), (2, 'Hi'), (2,
END;
!output
-+----------------------------------+
-| job id |
-+----------------------------------+
-| e66a9bbf66c04a41b4d687e26c8296a0 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET '$internal.pipeline.job-id';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
SELECT * FROM BatchTable;
!output
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
index 295b822cff0..323f24b81f1 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
@@ -56,26 +56,10 @@ create table StreamingTable (
1 row in set
!ok
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = 'e68e7fabddfade4f42910980652582dc';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
-
INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2,
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
!output
-+----------------------------------+
-| job id |
-+----------------------------------+
-| e68e7fabddfade4f42910980652582dc |
-+----------------------------------+
-1 row in set
-!ok
+Job ID:
+!info
RESET '$internal.pipeline.job-id';
!output
@@ -134,36 +118,10 @@ create table BatchTable (
1 row in set
!ok
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = '29ba2263b9b86bd8a14b91487941bfe7';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
-
INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'),
(2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
!output
-+----------------------------------+
-| job id |
-+----------------------------------+
-| 29ba2263b9b86bd8a14b91487941bfe7 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET '$internal.pipeline.job-id';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
SELECT * FROM BatchTable;
!output
@@ -181,17 +139,6 @@ SELECT * FROM BatchTable;
7 rows in set
!ok
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = '84c7408a08c284d5736e50d3f5a648be';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
-
CREATE TABLE CtasTable
WITH (
'connector' = 'filesystem',
@@ -200,23 +147,8 @@ WITH (
)
AS SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3,
'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')) T(id, str);
!output
-+----------------------------------+
-| job id |
-+----------------------------------+
-| 84c7408a08c284d5736e50d3f5a648be |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET '$internal.pipeline.job-id';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
SELECT * FROM CtasTable;
!output
diff --git
a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
index 9f95973c831..63e9ccf2a75 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
@@ -120,39 +120,13 @@
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EX
+- Reused(reference_id=[1])
!ok
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = 'a5513ca0a886c6c9bafaf3acac43bfa5';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
-
EXECUTE STATEMENT SET BEGIN
INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2,
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 'Hello World'), (2,
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
END;
!output
-+----------------------------------+
-| job id |
-+----------------------------------+
-| a5513ca0a886c6c9bafaf3acac43bfa5 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET '$internal.pipeline.job-id';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
SELECT * FROM StreamingTable;
!output
@@ -235,7 +209,6 @@ str string
1 row in set
!ok
-
create table BatchTable2 (
id int,
str string
@@ -285,40 +258,14 @@
Sink(table=[default_catalog.default_database.BatchTable2], fields=[EXPR$0, EXPR$
+- Reused(reference_id=[1])
!ok
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = '2e2dc0a5a6315296062ba81eba340668';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
-
EXECUTE STATEMENT SET
BEGIN
INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'),
(2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'),
(2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
END;
!output
-+----------------------------------+
-| job id |
-+----------------------------------+
-| 2e2dc0a5a6315296062ba81eba340668 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET '$internal.pipeline.job-id';
-!output
-+--------+
-| result |
-+--------+
-| OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
SELECT * FROM BatchTable;
!output
diff --git
a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v1.snapshot
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v1.snapshot
index ae9bf88d5ca..8e5b27d3b6c 100644
---
a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v1.snapshot
+++
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v1.snapshot
@@ -242,19 +242,7 @@
"id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
},
"response" : {
- "type" : "object",
- "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:FetchResultsResponseBody",
- "properties" : {
- "results" : {
- "type" : "any"
- },
- "resultType" : {
- "type" : "string"
- },
- "nextResultUri" : {
- "type" : "string"
- }
- }
+ "type" : "any"
}
}, {
"url" : "/sessions/:session_handle/operations/:operation_handle/status",
diff --git
a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
index 92824ac0c44..f4d14c6089f 100644
---
a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
+++
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
@@ -301,26 +301,17 @@
} ]
},
"query-parameters" : {
- "queryParameters" : [ ]
+ "queryParameters" : [ {
+ "key" : "rowFormat",
+ "mandatory" : true
+ } ]
},
"request" : {
"type" : "object",
"id" :
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
},
"response" : {
- "type" : "object",
- "id" :
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:FetchResultsResponseBody",
- "properties" : {
- "results" : {
- "type" : "any"
- },
- "resultType" : {
- "type" : "string"
- },
- "nextResultUri" : {
- "type" : "string"
- }
- }
+ "type" : "any"
}
}, {
"url" : "/sessions/:session_handle/operations/:operation_handle/status",
@@ -391,4 +382,4 @@
}
}
} ]
-}
+}
\ No newline at end of file