This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 029860f26ce [FLINK-30693][sql-gateway] Supports to specify RowFormat 
when fetching results
029860f26ce is described below

commit 029860f26ce8455cdd2bd1244785f42fee0440ca
Author: yuzelin <[email protected]>
AuthorDate: Sun Jan 15 20:55:50 2023 +0800

    [FLINK-30693][sql-gateway] Supports to specify RowFormat when fetching 
results
    
    This closes #21677
---
 .../endpoint/hive/HiveServer2EndpointITCase.java   |   4 +-
 .../hive/HiveServer2EndpointStatementITCase.java   |  75 ++++++++----
 .../src/test/resources/endpoint/select.q           |  58 +---------
 .../table/gateway/api/results/ResultSetImpl.java   |   4 +
 .../table/gateway/rest/SqlGatewayRestEndpoint.java |  16 ++-
 .../handler/statement/FetchResultsHandler.java     |  54 ++++++---
 .../rest/header/statement/FetchResultsHeaders.java |  78 ++++++++++---
 .../statement/FetchResultResponseBodyImpl.java     |  84 ++++++++++++++
 ...ers.java => FetchResultsMessageParameters.java} |  30 +++--
 .../statement/FetchResultsResponseBody.java        |  52 +++------
 .../FetchResultsRowFormatQueryParameter.java       |  49 ++++++++
 .../statement/NotReadyFetchResultResponse.java}    |  48 ++++----
 .../serde/FetchResultResponseBodyDeserializer.java |  90 +++++++++++++++
 .../serde/FetchResultsResponseBodySerializer.java  |  67 +++++++++++
 .../flink/table/gateway/rest/serde/ResultInfo.java | 126 ++++++++++++++++-----
 ...serializer.java => ResultInfoDeserializer.java} |  64 ++++++++---
 ...onSerializer.java => ResultInfoSerializer.java} | 100 ++++++++++------
 .../flink/table/gateway/rest/util/RowFormat.java   |  36 ++++++
 .../service/operation/OperationManager.java        |   5 +-
 .../gateway/service/result/NotReadyResult.java     |   2 +-
 .../gateway/AbstractSqlGatewayStatementITCase.java |  63 ++++++++---
 .../table/gateway/rest/OperationRelatedITCase.java |   4 +-
 .../table/gateway/rest/RestAPIITCaseBase.java      |   4 +-
 .../gateway/rest/SqlGatewayRestEndpointITCase.java |   4 +-
 .../SqlGatewayRestEndpointStatementITCase.java     | 100 +++++++++++-----
 .../gateway/rest/SqlGatewayRestEndpointTest.java   |   4 +-
 .../table/gateway/rest/StatementRelatedITCase.java |  20 +++-
 .../rest/serde/ResultInfoJsonSerDeTest.java        | 123 ++++++++++++++------
 .../rest/util/SqlGatewayRestEndpointExtension.java |   4 +-
 ...s.java => SqlGatewayRestEndpointTestUtils.java} |  16 ++-
 .../gateway/service/SqlGatewayServiceITCase.java   |   4 +-
 ...{resultInfo.txt => result_info_json_format.txt} |   2 +-
 .../resources/result_info_plain_text_format.txt    |   1 +
 .../src/test/resources/sql/begin_statement_set.q   |  60 +---------
 .../src/test/resources/sql/insert.q                |  80 +------------
 .../src/test/resources/sql/statement_set.q         |  61 +---------
 .../resources/sql_gateway_rest_api_v1.snapshot     |  14 +--
 .../resources/sql_gateway_rest_api_v2.snapshot     |  21 +---
 38 files changed, 1042 insertions(+), 585 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
index 98e8612a54a..71bb5277f37 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl;
+import org.apache.flink.table.gateway.service.result.NotReadyResult;
 import org.apache.flink.table.gateway.service.session.SessionManager;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
 import 
org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0;
@@ -114,7 +115,6 @@ import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYN
 import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
 import static 
org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toSessionHandle;
 import static 
org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle;
-import static 
org.apache.flink.table.gateway.service.result.NotReadyResult.NOT_READY_RESULT;
 import static 
org.apache.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -816,7 +816,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
                                 sessionHandle,
                                 () -> {
                                     latch.await();
-                                    return NOT_READY_RESULT;
+                                    return NotReadyResult.INSTANCE;
                                 });
         manipulateOp.accept(
                 toTOperationHandle(sessionHandle, operationHandle, 
TOperationType.UNKNOWN));
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
index 09127b995ef..bccc4ffdadd 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointStatementITCase.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.table.gateway.AbstractSqlGatewayStatementITCase;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.DataTypeUtils;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
 import org.apache.hive.jdbc.HiveConnection;
 import org.apache.hive.service.rpc.thrift.TSessionHandle;
@@ -39,8 +40,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
 
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -53,6 +52,7 @@ import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
@@ -68,6 +68,16 @@ public class HiveServer2EndpointStatementITCase extends 
AbstractSqlGatewayStatem
     private Connection connection;
     private Statement statement;
 
+    @Parameters(name = "parameters={0}")
+    public static List<TestParameters> parameters() throws Exception {
+        return Stream.concat(
+                        listFlinkSqlTests().stream()
+                                .map(path -> new HiveTestParameters(path, 
true)),
+                        listTestSpecInTheSameModule("endpoint").stream()
+                                .map(path -> new HiveTestParameters(path, 
false)))
+                .collect(Collectors.toList());
+    }
+
     @BeforeEach
     @Override
     public void before(@TempDir Path temporaryFolder) throws Exception {
@@ -82,16 +92,6 @@ public class HiveServer2EndpointStatementITCase extends 
AbstractSqlGatewayStatem
         connection.close();
     }
 
-    public static Stream<String> listHiveSqlTests() throws Exception {
-        return listTestSpecInTheSameModule("endpoint");
-    }
-
-    @ParameterizedTest
-    @MethodSource("listHiveSqlTests")
-    public void testHiveSqlStatements(String sqlPath) throws Exception {
-        runTest(sqlPath);
-    }
-
     @Override
     protected String runSingleStatement(String sql) throws Exception {
         statement.execute(sql);
@@ -151,18 +151,20 @@ public class HiveServer2EndpointStatementITCase extends 
AbstractSqlGatewayStatem
     }
 
     @Override
-    protected void resetSessionForFlinkSqlStatements() throws Exception {
-        for (String sql :
-                Arrays.asList(
-                        "RESET",
-                        "CREATE CATALOG `default_catalog` \n"
-                                + "WITH (\n"
-                                + "'type' = 'generic_in_memory',\n"
-                                + "'default-database' = 'default_database')",
-                        "USE CATALOG `default_catalog`",
-                        "DROP CATALOG hive",
-                        "UNLOAD MODULE hive")) {
-            runSingleStatement(sql);
+    protected void prepareEnvironment() throws Exception {
+        if (((HiveTestParameters) parameters).getResetEnvironment()) {
+            for (String sql :
+                    Arrays.asList(
+                            "RESET",
+                            "CREATE CATALOG `default_catalog` \n"
+                                    + "WITH (\n"
+                                    + "'type' = 'generic_in_memory',\n"
+                                    + "'default-database' = 
'default_database')",
+                            "USE CATALOG `default_catalog`",
+                            "DROP CATALOG hive",
+                            "UNLOAD MODULE hive")) {
+                runSingleStatement(sql);
+            }
         }
     }
 
@@ -183,4 +185,29 @@ public class HiveServer2EndpointStatementITCase extends 
AbstractSqlGatewayStatem
 
         return DataTypes.ROW(fields);
     }
+
+    private static class HiveTestParameters extends TestParameters {
+
+        private final boolean resetEnvironment;
+
+        public HiveTestParameters(String sqlPath, boolean resetEnvironment) {
+            super(sqlPath);
+            this.resetEnvironment = resetEnvironment;
+        }
+
+        public boolean getResetEnvironment() {
+            return resetEnvironment;
+        }
+
+        @Override
+        public String toString() {
+            return "HiveTestParameters{"
+                    + "resetEnvironment="
+                    + resetEnvironment
+                    + ", sqlPath='"
+                    + sqlPath
+                    + '\''
+                    + '}';
+        }
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hive/src/test/resources/endpoint/select.q 
b/flink-connectors/flink-connector-hive/src/test/resources/endpoint/select.q
index 26f6c01de8b..2c7400e2e53 100644
--- a/flink-connectors/flink-connector-hive/src/test/resources/endpoint/select.q
+++ b/flink-connectors/flink-connector-hive/src/test/resources/endpoint/select.q
@@ -27,35 +27,10 @@ CREATE TABLE dummy (
 1 row in set
 !ok
 
-SET $internal.pipeline.job-id = 879d95431dd5516bf93a44c9e0fdf3b5;
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
-
 INSERT INTO dummy VALUES (1);
 !output
-+----------------------------------+
-|                           job id |
-+----------------------------------+
-| 879d95431dd5516bf93a44c9e0fdf3b5 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET $internal.pipeline.job-id;
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
 
 # ==========================================================================
 # test all types
@@ -87,37 +62,12 @@ CREATE TABLE hive_types_table (
 1 row in set
 !ok
 
-SET $internal.pipeline.job-id = 40eadf85976ed00808ff1c6b20e8d616;
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
-
 INSERT INTO hive_types_table
 SELECT true, 1, 2, 3, 4, 5.0, 6.0, 7.1111, 'Hello World', 'Flink Hive', 
'2112-12-12 00:00:05.006', '2002-12-13', 'byte', MAP('a', 'b'), ARRAY(1)
 FROM dummy LIMIT 1;
 !output
-+----------------------------------+
-|                           job id |
-+----------------------------------+
-| 40eadf85976ed00808ff1c6b20e8d616 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET $internal.pipeline.job-id;
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
 
 SELECT * FROM hive_types_table;
 !output
diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java
index 7ab13606ecc..ea94f1c4568 100644
--- 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSetImpl.java
@@ -88,6 +88,10 @@ public class ResultSetImpl implements ResultSet {
         return data;
     }
 
+    public RowDataToStringConverter getConverter() {
+        return converter;
+    }
+
     @Override
     public boolean isQueryResult() {
         return isQueryResult;
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
index 3d0114219fc..cb573afd5b9 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpoint.java
@@ -167,10 +167,18 @@ public class SqlGatewayRestEndpoint extends 
RestServerEndpoint implements SqlGat
         handlers.add(Tuple2.of(ExecuteStatementHeaders.getInstance(), 
executeStatementHandler));
 
         // Fetch results
-        FetchResultsHandler fetchResultsHandler =
-                new FetchResultsHandler(
-                        service, responseHeaders, 
FetchResultsHeaders.getInstance());
-        handlers.add(Tuple2.of(FetchResultsHeaders.getInstance(), 
fetchResultsHandler));
+        handlers.add(
+                Tuple2.of(
+                        FetchResultsHeaders.getDefaultInstance(),
+                        new FetchResultsHandler(
+                                service,
+                                responseHeaders,
+                                FetchResultsHeaders.getDefaultInstance())));
+        handlers.add(
+                Tuple2.of(
+                        FetchResultsHeaders.getInstanceV1(),
+                        new FetchResultsHandler(
+                                service, responseHeaders, 
FetchResultsHeaders.getInstanceV1())));
     }
 
     @Override
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
index 52b18be0bed..8cb4ff80802 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/handler/statement/FetchResultsHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.table.gateway.rest.handler.statement;
 
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
@@ -30,14 +32,17 @@ import 
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler
 import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
 import 
org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPathParameter;
 import 
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultResponseBodyImpl;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
-import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsRowFormatQueryParameter;
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenPathParameter;
+import 
org.apache.flink.table.gateway.rest.message.statement.NotReadyFetchResultResponse;
 import org.apache.flink.table.gateway.rest.serde.ResultInfo;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
 import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -45,49 +50,64 @@ import java.util.concurrent.CompletableFuture;
 /** Handler to fetch results. */
 public class FetchResultsHandler
         extends AbstractSqlGatewayRestHandler<
-                EmptyRequestBody, FetchResultsResponseBody, 
FetchResultsTokenParameters> {
+                EmptyRequestBody, FetchResultsResponseBody, 
FetchResultsMessageParameters> {
 
     public FetchResultsHandler(
             SqlGatewayService service,
             Map<String, String> responseHeaders,
-            MessageHeaders<EmptyRequestBody, FetchResultsResponseBody, 
FetchResultsTokenParameters>
+            MessageHeaders<
+                            EmptyRequestBody,
+                            FetchResultsResponseBody,
+                            FetchResultsMessageParameters>
                     messageHeaders) {
         super(service, responseHeaders, messageHeaders);
     }
 
     @Override
     protected CompletableFuture<FetchResultsResponseBody> handleRequest(
-            SqlGatewayRestAPIVersion version, @Nonnull 
HandlerRequest<EmptyRequestBody> request) {
+            SqlGatewayRestAPIVersion version, @Nonnull 
HandlerRequest<EmptyRequestBody> request)
+            throws RestHandlerException {
         // Parse the parameters
         SessionHandle sessionHandle = 
request.getPathParameter(SessionHandleIdPathParameter.class);
         OperationHandle operationHandle =
                 request.getPathParameter(OperationHandleIdPathParameter.class);
         Long token = 
request.getPathParameter(FetchResultsTokenPathParameter.class);
+        RowFormat rowFormat =
+                HandlerRequestUtils.getQueryParameter(
+                        request, FetchResultsRowFormatQueryParameter.class, 
RowFormat.JSON);
 
         // Get the statement results
-        @Nullable ResultSet resultSet;
-        @Nullable String resultType;
-        Long nextToken;
-
+        ResultSet resultSet;
         try {
             resultSet =
                     service.fetchResults(sessionHandle, operationHandle, 
token, Integer.MAX_VALUE);
-            nextToken = resultSet.getNextToken();
-            resultType = resultSet.getResultType().toString();
         } catch (Exception e) {
             throw new SqlGatewayException(e);
         }
 
-        // Build the response
+        ResultSet.ResultType resultType = resultSet.getResultType();
+        Long nextToken = resultSet.getNextToken();
         String nextResultUri =
                 FetchResultsHeaders.buildNextUri(
-                        version.name().toLowerCase(),
+                        version,
                         sessionHandle.getIdentifier().toString(),
                         operationHandle.getIdentifier().toString(),
-                        nextToken);
+                        nextToken,
+                        rowFormat);
 
-        return CompletableFuture.completedFuture(
-                new FetchResultsResponseBody(
-                        ResultInfo.createResultInfo(resultSet), resultType, 
nextResultUri));
+        // Build the response
+        if (resultType == ResultSet.ResultType.NOT_READY) {
+            return CompletableFuture.completedFuture(
+                    new NotReadyFetchResultResponse(nextResultUri));
+        } else {
+            return CompletableFuture.completedFuture(
+                    new FetchResultResponseBodyImpl(
+                            resultType,
+                            resultSet.isQueryResult(),
+                            resultSet.getJobID(),
+                            resultSet.getResultKind(),
+                            ResultInfo.createResultInfo(resultSet, rowFormat),
+                            nextResultUri));
+        }
     }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
index 274b24ff129..49399ed67d7 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/header/statement/FetchResultsHeaders.java
@@ -20,23 +20,35 @@ package 
org.apache.flink.table.gateway.rest.header.statement;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.table.gateway.rest.header.SqlGatewayMessageHeaders;
 import 
org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPathParameter;
 import 
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
-import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenPathParameter;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion.V1;
+
 /** Message headers for fetching results. */
 public class FetchResultsHeaders
         implements SqlGatewayMessageHeaders<
-                EmptyRequestBody, FetchResultsResponseBody, 
FetchResultsTokenParameters> {
+                EmptyRequestBody, FetchResultsResponseBody, 
FetchResultsMessageParameters> {
 
-    private static final FetchResultsHeaders INSTANCE = new 
FetchResultsHeaders();
+    private static final FetchResultsHeaders INSTANCE_V1 = new 
FetchResultsHeaders(V1);
+    private static final FetchResultsHeaders DEFAULT_INSTANCE =
+            new 
FetchResultsHeaders(SqlGatewayRestAPIVersion.getDefaultVersion());
 
     public static final String URL =
             "/sessions/:"
@@ -46,6 +58,41 @@ public class FetchResultsHeaders
                     + "/result/:"
                     + FetchResultsTokenPathParameter.KEY;
 
+    private final SqlGatewayRestAPIVersion version;
+
+    private FetchResultsHeaders(SqlGatewayRestAPIVersion version) {
+        this.version = version;
+    }
+
+    public static FetchResultsHeaders getInstanceV1() {
+        return INSTANCE_V1;
+    }
+
+    public static FetchResultsHeaders getDefaultInstance() {
+        return DEFAULT_INSTANCE;
+    }
+
+    public static @Nullable String buildNextUri(
+            SqlGatewayRestAPIVersion version,
+            String sessionId,
+            String operationId,
+            Long nextToken,
+            RowFormat rowFormat) {
+        if (nextToken == null) {
+            return null;
+        }
+
+        if (version == V1) {
+            return String.format(
+                    "/%s/sessions/%s/operations/%s/result/%s",
+                    version, sessionId, operationId, nextToken);
+        } else {
+            return String.format(
+                    "/%s/sessions/%s/operations/%s/result/%s?rowFormat=%s",
+                    version, sessionId, operationId, nextToken, rowFormat);
+        }
+    }
+
     @Override
     public Class<FetchResultsResponseBody> getResponseClass() {
         return FetchResultsResponseBody.class;
@@ -71,20 +118,15 @@ public class FetchResultsHeaders
         return URL;
     }
 
-    public static FetchResultsHeaders getInstance() {
-        return INSTANCE;
-    }
-
-    @Nullable
-    public static String buildNextUri(
-            String version, String sessionId, String operationId, Long 
nextToken) {
-        if (nextToken != null) {
-            return String.format(
-                    "/%s/sessions/%s/operations/%s/result/%s",
-                    version, sessionId, operationId, nextToken);
+    @Override
+    public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
+        if (version == V1) {
+            return Collections.singleton(V1);
         } else {
-            // Empty uri indicates there is no more data
-            return null;
+            return Arrays.stream(SqlGatewayRestAPIVersion.values())
+                    .filter(SqlGatewayRestAPIVersion::isStableVersion)
+                    .filter(version -> version != V1)
+                    .collect(Collectors.toList());
         }
     }
 
@@ -94,8 +136,8 @@ public class FetchResultsHeaders
     }
 
     @Override
-    public FetchResultsTokenParameters getUnresolvedMessageParameters() {
-        return new FetchResultsTokenParameters();
+    public FetchResultsMessageParameters getUnresolvedMessageParameters() {
+        return new FetchResultsMessageParameters(version);
     }
 
     @Override
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultResponseBodyImpl.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultResponseBodyImpl.java
new file mode 100644
index 00000000000..14a19f3d677
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultResponseBodyImpl.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.statement;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.rest.serde.ResultInfo;
+
+import javax.annotation.Nullable;
+
+/** The implementation of the {@link FetchResultsResponseBody} with ready 
results. */
+public class FetchResultResponseBodyImpl implements FetchResultsResponseBody {
+
+    private final ResultInfo results;
+    private final ResultSet.ResultType resultType;
+    @Nullable private final String nextResultUri;
+    private final boolean isQueryResult;
+    @Nullable private final JobID jobID;
+    private final ResultKind resultKind;
+
+    public FetchResultResponseBodyImpl(
+            ResultSet.ResultType resultType,
+            boolean isQueryResult,
+            @Nullable JobID jobID,
+            ResultKind resultKind,
+            ResultInfo results,
+            @Nullable String nextResultUri) {
+        this.results = results;
+        this.resultType = resultType;
+        this.nextResultUri = nextResultUri;
+        this.isQueryResult = isQueryResult;
+        this.jobID = jobID;
+        this.resultKind = resultKind;
+    }
+
+    @Override
+    public ResultInfo getResults() {
+        return results;
+    }
+
+    @Override
+    public ResultSet.ResultType getResultType() {
+        return resultType;
+    }
+
+    @Nullable
+    @Override
+    public String getNextResultUri() {
+        return nextResultUri;
+    }
+
+    @Override
+    public boolean isQueryResult() {
+        return isQueryResult;
+    }
+
+    @Nullable
+    @Override
+    public JobID getJobID() {
+        return jobID;
+    }
+
+    @Override
+    public ResultKind getResultKind() {
+        return resultKind;
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsMessageParameters.java
similarity index 68%
rename from 
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java
rename to 
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsMessageParameters.java
index 132dc631a7b..0f74a1dc628 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsTokenParameters.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsMessageParameters.java
@@ -25,13 +25,15 @@ import 
org.apache.flink.table.gateway.api.operation.OperationHandle;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import 
org.apache.flink.table.gateway.rest.message.operation.OperationHandleIdPathParameter;
 import 
org.apache.flink.table.gateway.rest.message.session.SessionHandleIdPathParameter;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 
-/** {@link MessagePathParameter} for fetching results. */
-public class FetchResultsTokenParameters extends MessageParameters {
+/** {@link MessageParameters} for fetching results. */
+public class FetchResultsMessageParameters extends MessageParameters {
 
     private final SessionHandleIdPathParameter sessionHandleIdPathParameter =
             new SessionHandleIdPathParameter();
@@ -42,15 +44,25 @@ public class FetchResultsTokenParameters extends 
MessageParameters {
     private final FetchResultsTokenPathParameter 
fetchResultsTokenPathParameter =
             new FetchResultsTokenPathParameter();
 
-    public FetchResultsTokenParameters() {
-        // nothing to resolve
+    private final FetchResultsRowFormatQueryParameter 
fetchResultsRowFormatQueryParameter =
+            new FetchResultsRowFormatQueryParameter();
+
+    private final SqlGatewayRestAPIVersion version;
+
+    public FetchResultsMessageParameters(SqlGatewayRestAPIVersion version) {
+        this.version = version;
     }
 
-    public FetchResultsTokenParameters(
-            SessionHandle sessionHandle, OperationHandle operationHandle, Long 
token) {
+    public FetchResultsMessageParameters(
+            SessionHandle sessionHandle,
+            OperationHandle operationHandle,
+            Long token,
+            RowFormat rowFormat) {
+        this.version = SqlGatewayRestAPIVersion.getDefaultVersion();
         sessionHandleIdPathParameter.resolve(sessionHandle);
         operationHandleIdPathParameter.resolve(operationHandle);
         fetchResultsTokenPathParameter.resolve(token);
+        
fetchResultsRowFormatQueryParameter.resolve(Collections.singletonList(rowFormat));
     }
 
     @Override
@@ -63,6 +75,10 @@ public class FetchResultsTokenParameters extends 
MessageParameters {
 
     @Override
     public Collection<MessageQueryParameter<?>> getQueryParameters() {
-        return Collections.emptyList();
+        if (version == SqlGatewayRestAPIVersion.V1) {
+            return Collections.emptyList();
+        } else {
+            return 
Collections.singletonList(fetchResultsRowFormatQueryParameter);
+        }
     }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java
index ccdaee00731..20d24b4aad0 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsResponseBody.java
@@ -18,57 +18,35 @@
 
 package org.apache.flink.table.gateway.rest.message.statement;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import 
org.apache.flink.table.gateway.rest.serde.FetchResultResponseBodyDeserializer;
+import 
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer;
 import org.apache.flink.table.gateway.rest.serde.ResultInfo;
-import org.apache.flink.table.gateway.rest.serde.ResultInfoJsonDeserializer;
-import org.apache.flink.table.gateway.rest.serde.ResultInfoJsonSerializer;
 
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import javax.annotation.Nullable;
 
 /** {@link ResponseBody} for executing a statement. */
-public class FetchResultsResponseBody implements ResponseBody {
+@JsonSerialize(using = FetchResultsResponseBodySerializer.class)
+@JsonDeserialize(using = FetchResultResponseBodyDeserializer.class)
+public interface FetchResultsResponseBody extends ResponseBody {
 
-    private static final String FIELD_RESULT_TYPE = "resultType";
-    private static final String FIELD_RESULTS = "results";
-    private static final String FIELD_NEXT_RESULT_URI = "nextResultUri";
+    ResultInfo getResults();
 
-    @JsonProperty(FIELD_RESULTS)
-    @JsonSerialize(using = ResultInfoJsonSerializer.class)
-    @JsonDeserialize(using = ResultInfoJsonDeserializer.class)
-    private final ResultInfo results;
+    ResultSet.ResultType getResultType();
 
-    @JsonProperty(FIELD_RESULT_TYPE)
-    private final String resultType;
-
-    @JsonProperty(FIELD_NEXT_RESULT_URI)
     @Nullable
-    private final String nextResultUri;
-
-    @JsonCreator
-    public FetchResultsResponseBody(
-            @JsonProperty(FIELD_RESULTS) ResultInfo results,
-            @JsonProperty(FIELD_RESULT_TYPE) String resultType,
-            @Nullable @JsonProperty(FIELD_NEXT_RESULT_URI) String 
nextResultUri) {
-        this.results = results;
-        this.resultType = resultType;
-        this.nextResultUri = nextResultUri;
-    }
+    String getNextResultUri();
 
-    public ResultInfo getResults() {
-        return results;
-    }
-
-    public String getResultType() {
-        return resultType;
-    }
+    boolean isQueryResult();
 
     @Nullable
-    public String getNextResultUri() {
-        return nextResultUri;
-    }
+    JobID getJobID();
+
+    ResultKind getResultKind();
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsRowFormatQueryParameter.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsRowFormatQueryParameter.java
new file mode 100644
index 00000000000..8ee20160937
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/FetchResultsRowFormatQueryParameter.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.message.statement;
+
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
+
+/**
+ * A {@link MessageQueryParameter} that parses the 'rowFormat' query parameter 
for fetching results.
+ */
+public class FetchResultsRowFormatQueryParameter extends 
MessageQueryParameter<RowFormat> {
+
+    public static final String KEY = "rowFormat";
+
+    public FetchResultsRowFormatQueryParameter() {
+        super(KEY, MessageParameterRequisiteness.MANDATORY);
+    }
+
+    @Override
+    public RowFormat convertStringToValue(String value) {
+        return RowFormat.valueOf(value.toUpperCase());
+    }
+
+    @Override
+    public String convertValueToString(RowFormat value) {
+        return value.name();
+    }
+
+    @Override
+    public String getDescription() {
+        return "The row format to serialize the RowData.";
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/NotReadyFetchResultResponse.java
similarity index 50%
copy from 
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
copy to 
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/NotReadyFetchResultResponse.java
index e56b6024c02..0eda21724e1 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/statement/NotReadyFetchResultResponse.java
@@ -16,60 +16,58 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.gateway.service.result;
+package org.apache.flink.table.gateway.rest.message.statement;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.table.api.ResultKind;
-import org.apache.flink.table.catalog.ResolvedSchema;
-import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.rest.serde.ResultInfo;
 
-import java.util.Collections;
-import java.util.List;
+import javax.annotation.Nullable;
 
-/** To represent that the execution result is not ready to fetch. */
-public class NotReadyResult implements ResultSet {
+/** The {@link FetchResultsResponseBody} indicates the results is not ready. */
+public class NotReadyFetchResultResponse implements FetchResultsResponseBody {
 
-    public static final NotReadyResult NOT_READY_RESULT = new NotReadyResult();
+    private final String nextResultUri;
 
-    private NotReadyResult() {}
-
-    @Override
-    public ResultType getResultType() {
-        return ResultType.NOT_READY;
+    public NotReadyFetchResultResponse(String nextResultUri) {
+        this.nextResultUri = nextResultUri;
     }
 
     @Override
-    public Long getNextToken() {
-        return 0L;
+    public ResultInfo getResults() {
+        throw new SqlGatewayException(
+                "The result is not ready. Please fetch results until the 
result type is PAYLOAD or EOS.");
     }
 
     @Override
-    public ResolvedSchema getResultSchema() {
-        throw new UnsupportedOperationException(
-                "Don't know the schema for the result. Please continue 
fetching results until the result type is PAYLOAD or EOS.");
+    public ResultSet.ResultType getResultType() {
+        return ResultSet.ResultType.NOT_READY;
     }
 
+    @Nullable
     @Override
-    public List<RowData> getData() {
-        return Collections.emptyList();
+    public String getNextResultUri() {
+        return nextResultUri;
     }
 
     @Override
     public boolean isQueryResult() {
-        throw new UnsupportedOperationException(
+        throw new SqlGatewayException(
                 "Don't know whether a NOT_READY_RESULT is for a query. Please 
continue fetching results until the result type is PAYLOAD or EOS.");
     }
 
+    @Nullable
     @Override
     public JobID getJobID() {
-        throw new UnsupportedOperationException(
-                "Can't get job ID from a NOT_READY_RESULT. Please continue 
fetching results until the result type is PAYLOAD or EOS.");
+        throw new SqlGatewayException(
+                "Don't know the Job ID with NOT_READY_RESULT. Please continue 
fetching results until the result type is PAYLOAD or EOS.");
     }
 
     @Override
     public ResultKind getResultKind() {
-        throw new UnsupportedOperationException(
-                "Can't get result kind from a NOT_READY_RESULT. Please 
continue fetching results until the result type is PAYLOAD or EOS.");
+        throw new SqlGatewayException(
+                "Don't know the ResultKind with NOT_READY_RESULT. Please 
continue fetching results until the result type is PAYLOAD or EOS.");
     }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/FetchResultResponseBodyDeserializer.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/FetchResultResponseBodyDeserializer.java
new file mode 100644
index 00000000000..792bfc6b1a1
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/FetchResultResponseBodyDeserializer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.serde;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.table.api.ResultKind;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultResponseBodyImpl;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.NotReadyFetchResultResponse;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+
+import java.io.IOException;
+
+import static 
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer.FIELD_IS_QUERY_RESULT;
+import static 
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer.FIELD_JOB_ID;
+import static 
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer.FIELD_NEXT_RESULT_URI;
+import static 
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer.FIELD_RESULTS;
+import static 
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer.FIELD_RESULT_KIND;
+import static 
org.apache.flink.table.gateway.rest.serde.FetchResultsResponseBodySerializer.FIELD_RESULT_TYPE;
+
+/** Deserializer to deserialize {@link FetchResultsResponseBody}. */
+public class FetchResultResponseBodyDeserializer extends 
StdDeserializer<FetchResultsResponseBody> {
+
+    private static final long serialVersionUID = 1L;
+
+    protected FetchResultResponseBodyDeserializer() {
+        super(FetchResultsResponseBody.class);
+    }
+
+    @Override
+    public FetchResultsResponseBody deserialize(
+            JsonParser jsonParser, DeserializationContext context) throws 
IOException {
+        final JsonNode jsonNode = jsonParser.readValueAsTree();
+
+        ResultSet.ResultType resultType = getResultType(jsonNode);
+        if (resultType == ResultSet.ResultType.NOT_READY) {
+            return new NotReadyFetchResultResponse(
+                    jsonNode.required(FIELD_NEXT_RESULT_URI).asText());
+        }
+
+        boolean isQuery = jsonNode.required(FIELD_IS_QUERY_RESULT).asBoolean();
+        ResultInfo result =
+                context.readValue(
+                        
jsonNode.required(FIELD_RESULTS).traverse(jsonParser.getCodec()),
+                        ResultInfo.class);
+        ResultKind resultKind = 
ResultKind.valueOf(jsonNode.required(FIELD_RESULT_KIND).asText());
+        JobID jobID =
+                jsonNode.has(FIELD_JOB_ID)
+                        ? 
JobID.fromHexString(jsonNode.get(FIELD_JOB_ID).asText())
+                        : null;
+        return new FetchResultResponseBodyImpl(
+                resultType,
+                isQuery,
+                jobID,
+                resultKind,
+                result,
+                jsonNode.has(FIELD_NEXT_RESULT_URI)
+                        ? jsonNode.get(FIELD_NEXT_RESULT_URI).asText()
+                        : null);
+    }
+
+    private ResultSet.ResultType getResultType(JsonNode serialized) throws 
IOException {
+        if (!serialized.has(FIELD_RESULT_TYPE)) {
+            throw new IOException(
+                    String.format("Can not find the required field %s.", 
FIELD_RESULT_TYPE));
+        }
+        return 
ResultSet.ResultType.valueOf(serialized.get(FIELD_RESULT_TYPE).asText());
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/FetchResultsResponseBodySerializer.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/FetchResultsResponseBodySerializer.java
new file mode 100644
index 00000000000..5972031ccb7
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/FetchResultsResponseBodySerializer.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.serde;
+
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+
+/** Serializer to serialize {@link FetchResultsResponseBody}. */
+public class FetchResultsResponseBodySerializer extends 
StdSerializer<FetchResultsResponseBody> {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String FIELD_RESULT_TYPE = "resultType";
+    public static final String FIELD_IS_QUERY_RESULT = "isQueryResult";
+    public static final String FIELD_JOB_ID = "jobID";
+    public static final String FIELD_RESULT_KIND = "resultKind";
+    public static final String FIELD_RESULTS = "results";
+    public static final String FIELD_NEXT_RESULT_URI = "nextResultUri";
+
+    public FetchResultsResponseBodySerializer() {
+        super(FetchResultsResponseBody.class);
+    }
+
+    @Override
+    public void serialize(
+            FetchResultsResponseBody value, JsonGenerator gen, 
SerializerProvider provider)
+            throws IOException {
+
+        gen.writeStartObject();
+        provider.defaultSerializeField(FIELD_RESULT_TYPE, 
value.getResultType(), gen);
+        if (value.getResultType() != ResultSet.ResultType.NOT_READY) {
+            // PAYLOAD or EOS
+            provider.defaultSerializeField(FIELD_IS_QUERY_RESULT, 
value.isQueryResult(), gen);
+            if (value.getJobID() != null) {
+                provider.defaultSerializeField(FIELD_JOB_ID, 
value.getJobID().toHexString(), gen);
+            }
+            provider.defaultSerializeField(FIELD_RESULT_KIND, 
value.getResultKind(), gen);
+            provider.defaultSerializeField(FIELD_RESULTS, value.getResults(), 
gen);
+        }
+        if (value.getNextResultUri() != null) {
+            gen.writeStringField(FIELD_NEXT_RESULT_URI, 
value.getNextResultUri());
+        }
+        gen.writeEndObject();
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
index c7ec5b05ff9..c34fb84dced 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfo.java
@@ -20,75 +20,121 @@ package org.apache.flink.table.gateway.rest.serde;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.results.ResultSetImpl;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
+import org.apache.flink.util.Preconditions;
 
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.table.types.logical.VarCharType.STRING_TYPE;
+
 /**
  * A {@code ResultInfo} contains information of a {@link ResultSet}. It is 
designed for transferring
  * the information of ResultSet via REST. For its serialization and 
deserialization, See:
  *
- * <p>{@link ResultInfoJsonSerializer} and {@link ResultInfoJsonDeserializer}
+ * <p>{@link ResultInfoSerializer} and {@link ResultInfoDeserializer}
  */
 @Internal
+@JsonSerialize(using = ResultInfoSerializer.class)
+@JsonDeserialize(using = ResultInfoDeserializer.class)
 public class ResultInfo {
 
-    // Columns
-    public static final String FIELD_NAME_COLUMN_INFOS = "columns";
-
-    // RowData
-    public static final String FIELD_NAME_DATA = "data";
-    public static final String FIELD_NAME_KIND = "kind";
-    public static final String FIELD_NAME_FIELDS = "fields";
-
     private final List<ColumnInfo> columnInfos;
     private final List<RowData> data;
+    private final RowFormat rowFormat;
 
-    public ResultInfo(List<ColumnInfo> columnInfos, List<RowData> data) {
+    ResultInfo(List<ColumnInfo> columnInfos, List<RowData> data, RowFormat 
rowFormat) {
         this.columnInfos = columnInfos;
         this.data = data;
+        this.rowFormat = rowFormat;
     }
 
-    public static ResultInfo createResultInfo(ResultSet resultSet) {
+    public static ResultInfo createResultInfo(ResultSet resultSet, RowFormat 
rowFormat) {
+        Preconditions.checkArgument(resultSet.getResultType() != 
ResultSet.ResultType.NOT_READY);
+        List<RowData> data = resultSet.getData();
+
+        switch (rowFormat) {
+            case JSON:
+                break;
+            case PLAIN_TEXT:
+                RowDataToStringConverter converter = ((ResultSetImpl) 
resultSet).getConverter();
+                data =
+                        data.stream()
+                                .map(rowData -> convertToPlainText(rowData, 
converter))
+                                .collect(Collectors.toList());
+
+                break;
+            default:
+                throw new UnsupportedOperationException(
+                        String.format("Unsupported row format: %s.", 
rowFormat));
+        }
+
         return new ResultInfo(
                 resultSet.getResultSchema().getColumns().stream()
                         .map(ColumnInfo::toColumnInfo)
                         .collect(Collectors.toList()),
-                resultSet.getData());
+                data,
+                rowFormat);
     }
 
+    /** Get the column info of the data. */
     public List<ColumnInfo> getColumnInfos() {
         return Collections.unmodifiableList(columnInfos);
     }
 
+    /** Get the data. */
     public List<RowData> getData() {
         return data;
     }
 
-    public List<RowData.FieldGetter> getFieldGetters() {
-        List<LogicalType> columnTypes =
-                
columnInfos.stream().map(ColumnInfo::getLogicalType).collect(Collectors.toList());
-        return IntStream.range(0, columnTypes.size())
-                .mapToObj(i -> RowData.createFieldGetter(columnTypes.get(i), 
i))
-                .collect(Collectors.toList());
+    /** Get the row format about the data. */
+    public RowFormat getRowFormat() {
+        return rowFormat;
     }
 
+    /**
+     * Create the {@link FieldGetter} to get column value in the results.
+     *
+     * <p>With {@code JSON} format, it uses the {@link ResolvedSchema} to 
build the getters.
+     * However, it uses {@link StringData}'s {@link FieldGetter} to get the 
column values.
+     */
+    public List<FieldGetter> getFieldGetters() {
+        if (rowFormat == RowFormat.JSON) {
+            List<LogicalType> columnTypes =
+                    columnInfos.stream()
+                            .map(ColumnInfo::getLogicalType)
+                            .collect(Collectors.toList());
+            return IntStream.range(0, columnTypes.size())
+                    .mapToObj(i -> 
RowData.createFieldGetter(columnTypes.get(i), i))
+                    .collect(Collectors.toList());
+        } else {
+            return IntStream.range(0, columnInfos.size())
+                    .mapToObj(i -> RowData.createFieldGetter(STRING_TYPE, i))
+                    .collect(Collectors.toList());
+        }
+    }
+
+    /** Get the schemas of the results. */
     public ResolvedSchema getResultSchema() {
         return ResolvedSchema.of(
                 
columnInfos.stream().map(ColumnInfo::toColumn).collect(Collectors.toList()));
     }
 
-    @Override
-    public int hashCode() {
-        return Objects.hash(columnInfos, data);
-    }
-
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -98,14 +144,38 @@ public class ResultInfo {
             return false;
         }
         ResultInfo that = (ResultInfo) o;
-        return Objects.equals(columnInfos, that.columnInfos) && 
Objects.equals(data, that.data);
+        return Objects.equals(columnInfos, that.columnInfos)
+                && Objects.equals(data, that.data)
+                && rowFormat == that.rowFormat;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(columnInfos, data, rowFormat);
     }
 
     @Override
     public String toString() {
-        return String.format(
-                "ResultInfo{\n  columnInfos=[%s],\n  rows=[%s]\n}",
-                
columnInfos.stream().map(Object::toString).collect(Collectors.joining(",")),
-                
data.stream().map(Object::toString).collect(Collectors.joining(",")));
+        return "ResultInfo{"
+                + "columnInfos="
+                + columnInfos
+                + ", data="
+                + data
+                + ", rowFormat="
+                + rowFormat
+                + '}';
+    }
+
+    private static RowData convertToPlainText(RowData rowData, 
RowDataToStringConverter converter) {
+        String[] plainTexts = converter.convert(rowData);
+        // The RowDataToStringConverter will convert null to a specific 
string. Here reassign it to
+        // null and let the caller determine how to use it.
+        IntStream.range(0, rowData.getArity())
+                .filter(rowData::isNullAt)
+                .forEach(i -> plainTexts[i] = null);
+
+        return GenericRowData.ofKind(
+                rowData.getRowKind(),
+                
Arrays.stream(plainTexts).map(StringData::fromString).toArray());
     }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonDeserializer.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoDeserializer.java
similarity index 63%
rename from 
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonDeserializer.java
rename to 
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoDeserializer.java
index c842ebc6051..113c3268d7e 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonDeserializer.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoDeserializer.java
@@ -23,6 +23,8 @@ import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonToRowDataConverters;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.CollectionUtil;
 
@@ -39,22 +41,23 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_COLUMN_INFOS;
-import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_DATA;
-import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_FIELDS;
-import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_KIND;
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfoSerializer.FIELD_NAME_COLUMN_INFOS;
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfoSerializer.FIELD_NAME_DATA;
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfoSerializer.FIELD_NAME_FIELDS;
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfoSerializer.FIELD_NAME_KIND;
+import static 
org.apache.flink.table.gateway.rest.serde.ResultInfoSerializer.FIELD_NAME_ROW_FORMAT;
 
 /**
- * Json deserializer for {@link ResultInfo}.
+ * Deserializer for {@link ResultInfo}.
  *
- * @see ResultInfoJsonSerializer for the reverse operation.
+ * @see ResultInfoSerializer for the reverse operation.
  */
 @Internal
-public class ResultInfoJsonDeserializer extends StdDeserializer<ResultInfo> {
+public class ResultInfoDeserializer extends StdDeserializer<ResultInfo> {
 
     private static final long serialVersionUID = 1L;
 
-    public ResultInfoJsonDeserializer() {
+    public ResultInfoDeserializer() {
         super(ResultInfo.class);
     }
 
@@ -74,27 +77,52 @@ public class ResultInfoJsonDeserializer extends 
StdDeserializer<ResultInfo> {
                                 .treeToValue(
                                         node.get(FIELD_NAME_COLUMN_INFOS), 
ColumnInfo[].class));
 
-        // generate converters for all fields of each row
-        List<JsonToRowDataConverters.JsonToRowDataConverter> converters =
-                columnInfos.stream()
-                        .map(ColumnInfo::getLogicalType)
-                        .map(TO_ROW_DATA_CONVERTERS::createConverter)
-                        .collect(Collectors.toList());
+        // deserialize RowFormat
+        RowFormat rowFormat =
+                
RowFormat.valueOf(node.get(FIELD_NAME_ROW_FORMAT).asText().toUpperCase());
 
         // deserialize rows
-        List<RowData> data = deserializeData((ArrayNode) 
node.get(FIELD_NAME_DATA), converters);
+        List<RowData> data =
+                deserializeData((ArrayNode) node.get(FIELD_NAME_DATA), 
columnInfos, rowFormat);
 
-        return new ResultInfo(columnInfos, data);
+        return new ResultInfo(columnInfos, data, rowFormat);
     }
 
     private List<RowData> deserializeData(
-            ArrayNode serializedRows,
-            List<JsonToRowDataConverters.JsonToRowDataConverter> converters) {
+            ArrayNode serializedRows, List<ColumnInfo> columnInfos, RowFormat 
rowFormat) {
+        // generate converters for all fields of each row
+        List<JsonToRowDataConverters.JsonToRowDataConverter> converters =
+                buildToRowDataConverters(columnInfos, rowFormat);
+
         List<RowData> data = new ArrayList<>();
         serializedRows.forEach(rowDataNode -> 
data.add(convertToRowData(rowDataNode, converters)));
         return data;
     }
 
+    private List<JsonToRowDataConverters.JsonToRowDataConverter> 
buildToRowDataConverters(
+            List<ColumnInfo> columnInfos, RowFormat rowFormat) {
+        if (rowFormat == RowFormat.JSON) {
+            return columnInfos.stream()
+                    .map(ColumnInfo::getLogicalType)
+                    .map(TO_ROW_DATA_CONVERTERS::createConverter)
+                    .collect(Collectors.toList());
+        } else if (rowFormat == RowFormat.PLAIN_TEXT) {
+            return IntStream.range(0, columnInfos.size())
+                    .mapToObj(
+                            i ->
+                                    
(JsonToRowDataConverters.JsonToRowDataConverter)
+                                            jsonNode ->
+                                                    jsonNode.isNull()
+                                                            ? null
+                                                            : 
StringData.fromString(
+                                                                    
jsonNode.asText()))
+                    .collect(Collectors.toList());
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unknown row format: %s.", rowFormat));
+        }
+    }
+
     private GenericRowData convertToRowData(
             JsonNode serializedRow,
             List<JsonToRowDataConverters.JsonToRowDataConverter> converters) {
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerializer.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
similarity index 57%
rename from 
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerializer.java
rename to 
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
index 762e634b39f..b796d32ba3e 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerializer.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/serde/ResultInfoSerializer.java
@@ -23,6 +23,7 @@ import org.apache.flink.formats.common.TimestampFormat;
 import org.apache.flink.formats.json.JsonFormatOptions;
 import org.apache.flink.formats.json.RowDataToJsonConverters;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -38,29 +39,36 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_COLUMN_INFOS;
-import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_DATA;
-import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_FIELDS;
-import static 
org.apache.flink.table.gateway.rest.serde.ResultInfo.FIELD_NAME_KIND;
-
 /**
- * Json serializer for {@link ResultInfo}.
+ * Serializer for {@link ResultInfo}.
  *
- * @see ResultInfoJsonDeserializer for the reverse operation.
+ * @see ResultInfoDeserializer for the reverse operation.
  */
 @Internal
-public class ResultInfoJsonSerializer extends StdSerializer<ResultInfo> {
+public class ResultInfoSerializer extends StdSerializer<ResultInfo> {
+
+    // Columns
+    public static final String FIELD_NAME_COLUMN_INFOS = "columns";
+
+    // RowData
+    public static final String FIELD_NAME_DATA = "data";
+    public static final String FIELD_NAME_KIND = "kind";
+    public static final String FIELD_NAME_FIELDS = "fields";
+
+    // RowFormat
+    public static final String FIELD_NAME_ROW_FORMAT = "rowFormat";
 
     private static final long serialVersionUID = 1L;
 
-    public ResultInfoJsonSerializer() {
+    public ResultInfoSerializer() {
         super(ResultInfo.class);
     }
 
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
     private static final RowDataToJsonConverters TO_JSON_CONVERTERS =
             new RowDataToJsonConverters(
-                    TimestampFormat.ISO_8601, 
JsonFormatOptions.MapNullKeyMode.LITERAL, "");
+                    TimestampFormat.ISO_8601, 
JsonFormatOptions.MapNullKeyMode.LITERAL, "null");
 
     @Override
     public void serialize(
@@ -74,8 +82,12 @@ public class ResultInfoJsonSerializer extends 
StdSerializer<ResultInfo> {
         serializerProvider.defaultSerializeField(
                 FIELD_NAME_COLUMN_INFOS, resultInfo.getColumnInfos(), 
jsonGenerator);
 
+        // serialize RowFormat
+        serializerProvider.defaultSerializeField(
+                FIELD_NAME_ROW_FORMAT, resultInfo.getRowFormat(), 
jsonGenerator);
+
         // serialize data
-        serializeData(resultInfo.getData(), buildToJsonConverters(resultInfo), 
jsonGenerator);
+        serializeData(resultInfo.getData(), 
buildRowDataConverters(resultInfo), jsonGenerator);
 
         jsonGenerator.writeEndObject();
     }
@@ -110,29 +122,49 @@ public class ResultInfoJsonSerializer extends 
StdSerializer<ResultInfo> {
         return serializedRowData;
     }
 
-    /** Composes the FieldGetter and RowDataToJsonConverter. */
-    private List<Function<RowData, JsonNode>> buildToJsonConverters(ResultInfo 
resultInfo) {
-        List<RowDataToJsonConverters.RowDataToJsonConverter> converters =
-                resultInfo.getColumnInfos().stream()
-                        .map(ColumnInfo::getLogicalType)
-                        .map(TO_JSON_CONVERTERS::createConverter)
-                        .collect(Collectors.toList());
-
+    private List<Function<RowData, JsonNode>> 
buildRowDataConverters(ResultInfo resultInfo) {
+        RowFormat rowFormat = resultInfo.getRowFormat();
         List<RowData.FieldGetter> fieldGetters = resultInfo.getFieldGetters();
-
-        return IntStream.range(0, converters.size())
-                .mapToObj(
-                        i ->
-                                (Function<RowData, JsonNode>)
-                                        rowData ->
-                                                converters
-                                                        .get(i)
-                                                        .convert(
-                                                                OBJECT_MAPPER,
-                                                                null,
-                                                                fieldGetters
-                                                                        .get(i)
-                                                                        
.getFieldOrNull(rowData)))
-                .collect(Collectors.toList());
+        if (rowFormat == RowFormat.JSON) {
+            List<RowDataToJsonConverters.RowDataToJsonConverter> converters =
+                    resultInfo.getColumnInfos().stream()
+                            .map(ColumnInfo::getLogicalType)
+                            .map(TO_JSON_CONVERTERS::createConverter)
+                            .collect(Collectors.toList());
+
+            return IntStream.range(0, converters.size())
+                    .mapToObj(
+                            i ->
+                                    (Function<RowData, JsonNode>)
+                                            rowData ->
+                                                    converters
+                                                            .get(i)
+                                                            .convert(
+                                                                    
OBJECT_MAPPER,
+                                                                    null,
+                                                                    
fieldGetters
+                                                                            
.get(i)
+                                                                            
.getFieldOrNull(
+                                                                               
     rowData)))
+                    .collect(Collectors.toList());
+        } else if (rowFormat == RowFormat.PLAIN_TEXT) {
+            return IntStream.range(0, resultInfo.getColumnInfos().size())
+                    .mapToObj(
+                            i ->
+                                    (Function<RowData, JsonNode>)
+                                            rowData -> {
+                                                Object value =
+                                                        
fieldGetters.get(i).getFieldOrNull(rowData);
+                                                return value == null
+                                                        ? 
OBJECT_MAPPER.getNodeFactory().nullNode()
+                                                        : OBJECT_MAPPER
+                                                                
.getNodeFactory()
+                                                                
.textNode(value.toString());
+                                            })
+                    .collect(Collectors.toList());
+        } else {
+            throw new UnsupportedOperationException(
+                    String.format("Unknown row format: %s.", rowFormat));
+        }
     }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RowFormat.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RowFormat.java
new file mode 100644
index 00000000000..3e931935d89
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/util/RowFormat.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.rest.util;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+
+/** Describes the serialization format of {@link RowData} in the {@link 
ResultSet}. */
+@PublicEvolving
+public enum RowFormat {
+    /**
+     * Serialize the RowData with JSON format. The serialized value can be 
deserialized by means of
+     * structural expressions.
+     */
+    JSON,
+
+    /** Serialize the RowData to SQL-compliant, plain strings. */
+    PLAIN_TEXT
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
index dc088fae5b1..4e16cb5dd6c 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.table.gateway.api.results.FetchOrientation;
 import org.apache.flink.table.gateway.api.results.OperationInfo;
 import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.NotReadyResult;
 import org.apache.flink.table.gateway.service.result.ResultFetcher;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 
@@ -45,8 +46,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
-import static 
org.apache.flink.table.gateway.service.result.NotReadyResult.NOT_READY_RESULT;
-
 /** Manager for the {@link Operation}. */
 @Internal
 public class OperationManager {
@@ -348,7 +347,7 @@ public class OperationManager {
             } else if (currentStatus == OperationStatus.RUNNING
                     || currentStatus == OperationStatus.PENDING
                     || currentStatus == OperationStatus.INITIALIZED) {
-                return NOT_READY_RESULT;
+                return NotReadyResult.INSTANCE;
             } else {
                 throw new SqlGatewayException(
                         String.format(
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
index e56b6024c02..45e61f9295e 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/NotReadyResult.java
@@ -30,7 +30,7 @@ import java.util.List;
 /** To represent that the execution result is not ready to fetch. */
 public class NotReadyResult implements ResultSet {
 
-    public static final NotReadyResult NOT_READY_RESULT = new NotReadyResult();
+    public static final NotReadyResult INSTANCE = new NotReadyResult();
 
     private NotReadyResult() {}
 
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
index 463b567518b..f14ea52468c 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/AbstractSqlGatewayStatementITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.gateway;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.service.utils.Constants;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
 import org.apache.flink.table.gateway.utils.SqlScriptReader;
 import org.apache.flink.table.gateway.utils.TestSqlStatement;
@@ -28,16 +29,19 @@ import org.apache.flink.table.utils.print.PrintStyle;
 import org.apache.flink.table.utils.print.RowDataToStringConverter;
 import org.apache.flink.test.junit5.MiniClusterExtension;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.MethodSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +63,7 @@ import java.util.Map;
 import java.util.jar.JarEntry;
 import java.util.jar.JarFile;
 import java.util.regex.Pattern;
-import java.util.stream.Stream;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.gateway.utils.SqlScriptReader.HINT_START_OF_OUTPUT;
 import static 
org.apache.flink.table.planner.utils.TableTestUtil.replaceNodeIdInOperator;
@@ -68,6 +72,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Base ITCase tests for statements. */
+@ExtendWith(ParameterizedTestExtension.class)
 public abstract class AbstractSqlGatewayStatementITCase extends 
AbstractTestBase {
 
     private static final Logger LOG =
@@ -89,6 +94,13 @@ public abstract class AbstractSqlGatewayStatementITCase 
extends AbstractTestBase
 
     private final Map<String, String> replaceVars = new HashMap<>();
 
+    @Parameter public TestParameters parameters;
+
+    @Parameters(name = "parameters={0}")
+    public static List<TestParameters> parameters() throws Exception {
+        return 
listFlinkSqlTests().stream().map(TestParameters::new).collect(Collectors.toList());
+    }
+
     @BeforeAll
     public static void setUp() {
         service = SQL_GATEWAY_SERVICE_EXTENSION.getService();
@@ -111,11 +123,10 @@ public abstract class AbstractSqlGatewayStatementITCase 
extends AbstractTestBase
                 
Files.createDirectory(temporaryFolder.resolve("batch_ctas")).toFile().getPath());
     }
 
-    @ParameterizedTest
-    @MethodSource("listFlinkSqlTests")
-    public void testFlinkSqlStatements(String sqlPath) throws Exception {
-        resetSessionForFlinkSqlStatements();
-        runTest(sqlPath);
+    @TestTemplate
+    public void testFlinkSqlStatements() throws Exception {
+        prepareEnvironment();
+        runTest(parameters.getSqlPath());
     }
 
     /**
@@ -153,6 +164,25 @@ public abstract class AbstractSqlGatewayStatementITCase 
extends AbstractTestBase
     // Utility
     // 
-------------------------------------------------------------------------------------------
 
+    /** Parameters of the test spec. */
+    protected static class TestParameters {
+
+        protected final String sqlPath;
+
+        public TestParameters(String sqlPath) {
+            this.sqlPath = sqlPath;
+        }
+
+        public String getSqlPath() {
+            return sqlPath;
+        }
+
+        @Override
+        public String toString() {
+            return "TestParameters{" + "sqlPath='" + sqlPath + '\'' + '}';
+        }
+    }
+
     /** Mark the output type. */
     public enum Tag {
         INFO("!info"),
@@ -222,7 +252,7 @@ public abstract class AbstractSqlGatewayStatementITCase 
extends AbstractTestBase
                 values);
     }
 
-    private static Stream<String> listFlinkSqlTests() throws Exception {
+    protected static List<String> listFlinkSqlTests() throws Exception {
         final File jarFile =
                 new File(
                         AbstractSqlGatewayStatementITCase.class
@@ -244,14 +274,13 @@ public abstract class AbstractSqlGatewayStatementITCase 
extends AbstractTestBase
                     }
                 }
             }
-            return files.stream();
+            return files;
         } else {
             return listTestSpecInTheSameModule(RESOURCE_DIR);
         }
     }
 
-    protected static Stream<String> listTestSpecInTheSameModule(String 
resourceDir)
-            throws Exception {
+    protected static List<String> listTestSpecInTheSameModule(String 
resourceDir) throws Exception {
         return IOUtils.readLines(
                         checkNotNull(
                                 AbstractSqlGatewayStatementITCase.class
@@ -259,7 +288,8 @@ public abstract class AbstractSqlGatewayStatementITCase 
extends AbstractTestBase
                                         .getResourceAsStream(resourceDir)),
                         StandardCharsets.UTF_8)
                 .stream()
-                .map(name -> Paths.get(resourceDir, name).toString());
+                .map(name -> Paths.get(resourceDir, name).toString())
+                .collect(Collectors.toList());
     }
 
     protected void runTest(String sqlPath) throws Exception {
@@ -269,7 +299,7 @@ public abstract class AbstractSqlGatewayStatementITCase 
extends AbstractTestBase
         assertThat(String.join("", 
runStatements(testSqlStatements))).isEqualTo(in);
     }
 
-    protected void resetSessionForFlinkSqlStatements() throws Exception {}
+    protected void prepareEnvironment() throws Exception {}
 
     /**
      * Returns printed results for each ran SQL statements.
@@ -295,6 +325,11 @@ public abstract class AbstractSqlGatewayStatementITCase 
extends AbstractTestBase
                                     replaceNodeIdInOperator(
                                             
iterator.next().getString(0).toString()))
                             + "\n");
+        } else if (schema.getColumn(0)
+                .map(col -> col.getName().equals(Constants.JOB_ID))
+                .orElse(false)) {
+            // ignore output of the job id
+            return Tag.INFO.addTag("Job ID:\n");
         } else {
             ByteArrayOutputStream outContent = new ByteArrayOutputStream();
             PrintStyle style =
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
index 47c26b4f543..de6a3519b72 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/OperationRelatedITCase.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.table.gateway.rest.message.operation.OperationMessagePar
 import 
org.apache.flink.table.gateway.rest.message.operation.OperationStatusResponseBody;
 import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionRequestBody;
 import 
org.apache.flink.table.gateway.rest.message.session.OpenSessionResponseBody;
+import org.apache.flink.table.gateway.service.result.NotReadyResult;
 
 import org.junit.jupiter.api.Test;
 
@@ -44,7 +45,6 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.flink.table.gateway.service.result.NotReadyResult.NOT_READY_RESULT;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -129,7 +129,7 @@ class OperationRelatedITCase extends RestAPIITCaseBase {
                                         TimeUnit.SECONDS.sleep(10);
                                     } catch (InterruptedException ignored) {
                                     }
-                                    return NOT_READY_RESULT;
+                                    return NotReadyResult.INSTANCE;
                                 });
         assertThat(operationHandle).isNotNull();
         return Arrays.asList(sessionHandleId, 
operationHandle.getIdentifier().toString());
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
index 2970d2f25a1..fb27468392a 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/RestAPIITCaseBase.java
@@ -40,8 +40,8 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.concurrent.CompletableFuture;
 
-import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
-import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getBaseConfig;
+import static 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getFlinkConfig;
 import static 
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
index e52a42ee354..dac0519a8b2 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java
@@ -70,8 +70,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
-import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getBaseConfig;
+import static 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getFlinkConfig;
 import static 
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
index 4fc378e9029..f87558f171a 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointStatementITCase.java
@@ -37,13 +37,16 @@ import 
org.apache.flink.table.gateway.rest.header.statement.FetchResultsHeaders;
 import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParameters;
 import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementRequestBody;
 import 
org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody;
+import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters;
 import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody;
-import 
org.apache.flink.table.gateway.rest.message.statement.FetchResultsTokenParameters;
 import org.apache.flink.table.gateway.rest.serde.ResultInfo;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
 import 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
+import 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils;
 import org.apache.flink.table.gateway.rest.util.TestingRestClient;
 import 
org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
 import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -51,6 +54,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Order;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.nio.file.Path;
 import java.time.Duration;
@@ -58,9 +63,13 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static 
org.apache.flink.table.api.internal.StaticResultProvider.SIMPLE_ROW_DATA_TO_STRING_CONVERTER;
 import static 
org.apache.flink.table.gateway.rest.util.TestingRestClient.getTestingRestClient;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -70,7 +79,10 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
  * Test basic logic of handlers inherited from {@link 
AbstractSqlGatewayRestHandler} in statement
  * related cases.
  */
-class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementITCase {
+public class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementITCase {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(SqlGatewayRestEndpointStatementITCase.class);
 
     @RegisterExtension
     @Order(3)
@@ -82,7 +94,7 @@ class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementI
             ExecuteStatementHeaders.getInstance();
     private static SessionMessageParameters sessionMessageParameters;
     private static final FetchResultsHeaders fetchResultsHeaders =
-            FetchResultsHeaders.getInstance();
+            FetchResultsHeaders.getDefaultInstance();
     private static final int OPERATION_WAIT_SECONDS = 100;
 
     private static final String PATTERN1 = "Caused by: ";
@@ -105,6 +117,17 @@ class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementI
         restClient.shutdown();
     }
 
+    @Parameters(name = "parameters={0}")
+    public static List<TestParameters> parameters() throws Exception {
+        return listFlinkSqlTests().stream()
+                .flatMap(
+                        path ->
+                                Stream.of(
+                                        new RestTestParameters(path, 
RowFormat.JSON),
+                                        new RestTestParameters(path, 
RowFormat.PLAIN_TEXT)))
+                .collect(Collectors.toList());
+    }
+
     @BeforeEach
     @Override
     public void before(@TempDir Path temporaryFolder) throws Exception {
@@ -153,11 +176,8 @@ class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementI
         ResultInfo resultInfo = fetchResultsResponseBody.getResults();
         assertThat(resultInfo).isNotNull();
 
-        String resultType = fetchResultsResponseBody.getResultType();
-        assertThat(
-                        Arrays.asList(
-                                ResultSet.ResultType.PAYLOAD.name(),
-                                ResultSet.ResultType.EOS.name()))
+        ResultSet.ResultType resultType = 
fetchResultsResponseBody.getResultType();
+        assertThat(Arrays.asList(ResultSet.ResultType.PAYLOAD, 
ResultSet.ResultType.EOS))
                 .contains(resultType);
 
         ResolvedSchema resultSchema = resultInfo.getResultSchema();
@@ -165,25 +185,31 @@ class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementI
         return toString(
                 StatementType.match(statement),
                 resultSchema,
-                new RowDataToStringConverterImpl(
-                        resultSchema.toPhysicalRowDataType(),
-                        DateTimeUtils.UTC_ZONE.toZoneId(),
-                        
SqlGatewayRestEndpointStatementITCase.class.getClassLoader(),
-                        false),
+                ((RestTestParameters) parameters).getRowFormat() == 
RowFormat.JSON
+                        ? new RowDataToStringConverterImpl(
+                                resultSchema.toPhysicalRowDataType(),
+                                DateTimeUtils.UTC_ZONE.toZoneId(),
+                                
SqlGatewayRestEndpointStatementITCase.class.getClassLoader(),
+                                false)
+                        : SIMPLE_ROW_DATA_TO_STRING_CONVERTER,
                 new RowDataIterator(sessionHandle, operationHandle));
     }
 
     FetchResultsResponseBody fetchResults(
             SessionHandle sessionHandle, OperationHandle operationHandle, Long 
token)
             throws Exception {
-        FetchResultsTokenParameters fetchResultsTokenParameters =
-                new FetchResultsTokenParameters(sessionHandle, 
operationHandle, token);
+        FetchResultsMessageParameters fetchResultsMessageParameters =
+                new FetchResultsMessageParameters(
+                        sessionHandle,
+                        operationHandle,
+                        token,
+                        ((RestTestParameters) parameters).getRowFormat());
         CompletableFuture<FetchResultsResponseBody> response =
                 restClient.sendRequest(
                         SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetAddress(),
                         SQL_GATEWAY_REST_ENDPOINT_EXTENSION.getTargetPort(),
                         fetchResultsHeaders,
-                        fetchResultsTokenParameters,
+                        fetchResultsMessageParameters,
                         EmptyRequestBody.getInstance());
         return response.get();
     }
@@ -202,6 +228,31 @@ class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementI
                 .equals(RuntimeExecutionMode.STREAMING);
     }
 
+    private static class RestTestParameters extends TestParameters {
+
+        private final RowFormat rowFormat;
+
+        public RestTestParameters(String sqlPath, RowFormat rowFormat) {
+            super(sqlPath);
+            this.rowFormat = rowFormat;
+        }
+
+        public RowFormat getRowFormat() {
+            return rowFormat;
+        }
+
+        @Override
+        public String toString() {
+            return "RestTestParameters{"
+                    + "sqlPath='"
+                    + sqlPath
+                    + '\''
+                    + ", rowFormat="
+                    + rowFormat
+                    + '}';
+        }
+    }
+
     private class RowDataIterator implements Iterator<RowData> {
 
         private final SessionHandle sessionHandle;
@@ -223,10 +274,11 @@ class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementI
                 try {
                     fetch();
                 } catch (Exception ignored) {
+                    LOG.error("Failed to fetch results.", ignored);
                 }
             }
 
-            return token != null;
+            return fetchedRows.hasNext();
         }
 
         @Override
@@ -238,18 +290,10 @@ class SqlGatewayRestEndpointStatementITCase extends 
AbstractSqlGatewayStatementI
             FetchResultsResponseBody fetchResultsResponseBody =
                     fetchResults(sessionHandle, operationHandle, token);
 
-            String nextResultUri = fetchResultsResponseBody.getNextResultUri();
-            token = parseTokenFromUri(nextResultUri);
-
+            token =
+                    SqlGatewayRestEndpointTestUtils.parseToken(
+                            fetchResultsResponseBody.getNextResultUri());
             fetchedRows = 
fetchResultsResponseBody.getResults().getData().iterator();
         }
     }
-
-    private static Long parseTokenFromUri(String uri) {
-        if (uri == null || uri.length() == 0) {
-            return null;
-        }
-        String[] split = uri.split("/");
-        return Long.valueOf(split[split.length - 1]);
-    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointTest.java
index 0fa3389317f..e1410b8b8de 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointTest.java
@@ -30,8 +30,8 @@ import org.junit.jupiter.api.Test;
 
 import static 
org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getEndpointConfig;
 import static 
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory.IDENTIFIER;
-import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
-import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getSqlGatewayRestOptionFullName;
+import static 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getBaseConfig;
+import static 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getSqlGatewayRestOptionFullName;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/StatementRelatedITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/StatementRelatedITCase.java
index 0fe21bf4cf1..5efeb7d0a79 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/StatementRelatedITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/StatementRelatedITCase.java
@@ -29,8 +29,11 @@ import 
org.apache.flink.table.gateway.rest.message.session.SessionMessageParamet
 import 
org.apache.flink.table.gateway.rest.message.statement.CompleteStatementRequestBody;
 import 
org.apache.flink.table.gateway.rest.message.statement.CompleteStatementResponseBody;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -43,20 +46,25 @@ import static org.assertj.core.api.Assertions.assertThat;
  */
 public class StatementRelatedITCase extends RestAPIITCaseBase {
 
-    @Test
-    public void testCompleteStatement() throws Exception {
+    private SessionHandle sessionHandle;
+    private SessionMessageParameters sessionMessageParameters;
+    private @TempDir Path tempDir;
+
+    @BeforeEach
+    public void setUp() throws Exception {
         CompletableFuture<OpenSessionResponseBody> response =
                 sendRequest(
                         OpenSessionHeaders.getInstance(),
                         EmptyMessageParameters.getInstance(),
                         new OpenSessionRequestBody(null, null));
 
-        SessionHandle sessionHandle =
-                new 
SessionHandle(UUID.fromString(response.get().getSessionHandle()));
+        sessionHandle = new 
SessionHandle(UUID.fromString(response.get().getSessionHandle()));
 
-        SessionMessageParameters sessionMessageParameters =
-                new SessionMessageParameters(sessionHandle);
+        sessionMessageParameters = new SessionMessageParameters(sessionHandle);
+    }
 
+    @Test
+    public void testCompleteStatement() throws Exception {
         CompletableFuture<CompleteStatementResponseBody> 
completeStatementResponse =
                 sendRequest(
                         CompleteStatementHeaders.getINSTANCE(),
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java
index fe86784f78c..5cb765be07e 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/serde/ResultInfoJsonSerDeTest.java
@@ -22,8 +22,13 @@ import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.gateway.rest.util.RowFormat;
+import 
org.apache.flink.table.planner.functions.casting.RowDataToStringConverterImpl;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.DateTimeUtils;
+import org.apache.flink.table.utils.print.RowDataToStringConverter;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
@@ -33,7 +38,9 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.Si
 
 import org.apache.commons.io.IOUtils;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -45,7 +52,6 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.ZoneOffset;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -54,6 +60,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.table.api.DataTypes.ARRAY;
 import static org.apache.flink.table.api.DataTypes.BIGINT;
@@ -76,7 +83,7 @@ import static 
org.apache.flink.table.api.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZON
 import static org.apache.flink.table.api.DataTypes.TINYINT;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link ResultInfoJsonSerializer} and {@link 
ResultInfoJsonDeserializer}. */
+/** Tests for {@link ResultInfoSerializer} and {@link ResultInfoDeserializer}. 
*/
 public class ResultInfoJsonSerDeTest {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private static final Row testRow = initRow();
@@ -84,69 +91,75 @@ public class ResultInfoJsonSerDeTest {
     @BeforeAll
     public static void setUp() {
         SimpleModule simpleModule = new SimpleModule();
-        simpleModule.addSerializer(ResultInfo.class, new 
ResultInfoJsonSerializer());
-        simpleModule.addDeserializer(ResultInfo.class, new 
ResultInfoJsonDeserializer());
+        simpleModule.addSerializer(ResultInfo.class, new 
ResultInfoSerializer());
+        simpleModule.addDeserializer(ResultInfo.class, new 
ResultInfoDeserializer());
         OBJECT_MAPPER.registerModule(simpleModule);
     }
 
-    @Test
-    public void testResultInfoSerDeWithSingleRow() throws Exception {
-        serDeTest(Collections.singletonList(testRow));
+    @ParameterizedTest
+    @EnumSource(RowFormat.class)
+    public void testResultInfoSerDeWithSingleRow(RowFormat rowFormat) throws 
Exception {
+        serDeTest(Collections.singletonList(testRow), rowFormat);
     }
 
-    @Test
-    public void testResultInfoSerDeWithMultiRowData() throws Exception {
-        List<Row> rows = new ArrayList<>();
-        for (int i = 0; i < 10; i++) {
-            rows.add(testRow);
-        }
-        serDeTest(rows);
+    @ParameterizedTest
+    @EnumSource(RowFormat.class)
+    public void testResultInfoSerDeWithMultiRowData(RowFormat rowFormat) 
throws Exception {
+        serDeTest(Collections.nCopies(10, testRow), rowFormat);
     }
 
-    @Test
-    public void testResultInfoSerDeWithNullValues() throws Exception {
-        List<Row> rows = new ArrayList<>();
-        List<Integer> positions = new ArrayList<>();
-        for (int i = 0; i < 18; i++) {
-            positions.add(new Random().nextInt(18));
-        }
-        for (int i = 0; i < 10; i++) {
-            rows.add(getTestRowDataWithNullValues(initRow(), positions));
-        }
-        serDeTest(rows);
+    @ParameterizedTest
+    @EnumSource(RowFormat.class)
+    public void testResultInfoSerDeWithNullValues(RowFormat rowFormat) throws 
Exception {
+        List<Integer> positions =
+                IntStream.range(0, 18)
+                        .mapToObj(i -> new Random().nextInt(18))
+                        .collect(Collectors.toList());
+
+        serDeTest(
+                Collections.nCopies(10, 
getTestRowDataWithNullValues(initRow(), positions)),
+                rowFormat);
     }
 
-    @Test
-    public void testDeserializationFromJson() throws Exception {
-        URL url = 
ResultInfoJsonSerDeTest.class.getClassLoader().getResource("resultInfo.txt");
+    @ParameterizedTest
+    @ValueSource(strings = {"result_info_json_format.txt", 
"result_info_plain_text_format.txt"})
+    public void testDeserializationFromJson(String fileName) throws Exception {
+        URL url = 
ResultInfoJsonSerDeTest.class.getClassLoader().getResource(fileName);
         String input =
                 IOUtils.toString(Preconditions.checkNotNull(url), 
StandardCharsets.UTF_8).trim();
         ResultInfo deserializedResult = OBJECT_MAPPER.readValue(input, 
ResultInfo.class);
         
assertThat(OBJECT_MAPPER.writeValueAsString(deserializedResult)).isEqualTo(input);
     }
 
-    private void serDeTest(List<Row> rows) throws IOException {
-        List<RowData> rowDataList =
+    private void serDeTest(List<Row> rows, RowFormat rowFormat) throws 
IOException {
+        List<RowData> rowDatas =
                 
rows.stream().map(this::convertToInternal).collect(Collectors.toList());
+        if (rowFormat == RowFormat.PLAIN_TEXT) {
+            rowDatas =
+                    rowDatas.stream()
+                            .map(this::toPlainTextFormatRowData)
+                            .collect(Collectors.toList());
+        }
+
         ResolvedSchema testResolvedSchema = getTestResolvedSchema(getFields());
         ResultInfo testResultInfo =
                 new ResultInfo(
                         testResolvedSchema.getColumns().stream()
                                 .map(ColumnInfo::toColumnInfo)
                                 .collect(Collectors.toList()),
-                        rowDataList);
+                        rowDatas,
+                        rowFormat);
 
         // test serialization & deserialization
         String result = OBJECT_MAPPER.writeValueAsString(testResultInfo);
         ResultInfo resultInfo = OBJECT_MAPPER.readValue(result, 
ResultInfo.class);
 
+        // validate schema
         assertThat(resultInfo.getResultSchema().toString())
                 .isEqualTo(testResultInfo.getResultSchema().toString());
 
-        List<RowData> data = resultInfo.getData();
-        for (int i = 0; i < data.size(); i++) {
-            assertThat(convertToExternal(data.get(i), 
ROW(getFields()))).isEqualTo(rows.get(i));
-        }
+        // validate data
+        assertDataWithFormat(resultInfo.getData(), rows, rowFormat);
     }
 
     private static Row initRow() {
@@ -269,4 +282,42 @@ public class ResultInfoJsonSerDeTest {
                 DataFormatConverters.getConverterForDataType(ROW(getFields()));
         return converter.toInternal(row);
     }
+
+    private RowData toPlainTextFormatRowData(RowData rowData) {
+        RowDataToStringConverter converter =
+                new RowDataToStringConverterImpl(
+                        
getTestResolvedSchema(getFields()).toPhysicalRowDataType(),
+                        DateTimeUtils.UTC_ZONE.toZoneId(),
+                        ResultInfoJsonSerDeTest.class.getClassLoader(),
+                        false);
+
+        StringData[] plainText =
+                Arrays.stream(converter.convert(rowData))
+                        .map(StringData::fromString)
+                        .toArray(StringData[]::new);
+
+        return GenericRowData.ofKind(rowData.getRowKind(), (Object[]) 
plainText);
+    }
+
+    private void assertDataWithFormat(
+            List<RowData> expected, List<Row> actual, RowFormat rowFormat) {
+        if (rowFormat == RowFormat.JSON) {
+            for (int i = 0; i < expected.size(); i++) {
+                assertThat(convertToExternal(expected.get(i), 
ROW(getFields())))
+                        .isEqualTo(actual.get(i));
+            }
+        } else {
+            for (int i = 0; i < expected.size(); i++) {
+                assertPlainTextFormatData(
+                        expected.get(i),
+                        
toPlainTextFormatRowData(convertToInternal(actual.get(i))));
+            }
+        }
+    }
+
+    private void assertPlainTextFormatData(RowData expected, RowData actual) {
+        for (int i = 0; i < expected.getArity(); i++) {
+            assertThat(expected.getString(i)).isEqualTo(actual.getString(i));
+        }
+    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java
index 4116669e756..2b69d5f68f7 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointExtension.java
@@ -32,8 +32,8 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.function.Supplier;
 
-import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getBaseConfig;
-import static 
org.apache.flink.table.gateway.rest.util.RestConfigUtils.getFlinkConfig;
+import static 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getBaseConfig;
+import static 
org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getFlinkConfig;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** A simple {@link Extension} that manages the lifecycle of the {@link 
SqlGatewayRestEndpoint}. */
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/RestConfigUtils.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
similarity index 85%
rename from 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/RestConfigUtils.java
rename to 
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
index 09092e17180..177427b1523 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/RestConfigUtils.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/util/SqlGatewayRestEndpointTestUtils.java
@@ -21,13 +21,15 @@ package org.apache.flink.table.gateway.rest.util;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils;
 
+import javax.annotation.Nullable;
+
 import static 
org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getEndpointConfig;
 import static 
org.apache.flink.table.gateway.api.endpoint.SqlGatewayEndpointFactoryUtils.getSqlGatewayOptionPrefix;
 import static 
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory.IDENTIFIER;
 import static 
org.apache.flink.table.gateway.rest.SqlGatewayRestEndpointFactory.rebuildRestEndpointOptions;
 
 /** The tools to get configuration in test cases. */
-public class RestConfigUtils {
+public class SqlGatewayRestEndpointTestUtils {
 
     /** Get the full name of sql gateway rest endpoint options. */
     public static String getSqlGatewayRestOptionFullName(String key) {
@@ -62,4 +64,16 @@ public class RestConfigUtils {
         }
         return config;
     }
+
+    /** Parse token from the result uri. */
+    public static @Nullable Long parseToken(@Nullable String nextResultUri) {
+        if (nextResultUri == null || nextResultUri.length() == 0) {
+            return null;
+        }
+        String[] split = nextResultUri.split("/");
+        // remove query string
+        String s = split[split.length - 1];
+        s = s.replaceAll("\\?.*", "");
+        return Long.valueOf(s);
+    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index cd9993e7fed..03d338fed83 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -54,6 +54,7 @@ import 
org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 import org.apache.flink.table.gateway.service.operation.OperationManager;
+import org.apache.flink.table.gateway.service.result.NotReadyResult;
 import org.apache.flink.table.gateway.service.session.SessionManager;
 import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
@@ -108,7 +109,6 @@ import static 
org.apache.flink.table.functions.FunctionKind.AGGREGATE;
 import static org.apache.flink.table.functions.FunctionKind.OTHER;
 import static org.apache.flink.table.functions.FunctionKind.SCALAR;
 import static 
org.apache.flink.table.gateway.api.results.ResultSet.ResultType.PAYLOAD;
-import static 
org.apache.flink.table.gateway.service.result.NotReadyResult.NOT_READY_RESULT;
 import static 
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination;
 import static 
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.createInitializedSession;
 import static 
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.fetchResults;
@@ -288,7 +288,7 @@ public class SqlGatewayServiceITCase {
 
         startRunningLatch.await();
         assertThat(fetchResults(service, sessionHandle, operationHandle))
-                .isEqualTo(NOT_READY_RESULT);
+                .isEqualTo(NotReadyResult.INSTANCE);
         endRunningLatch.countDown();
     }
 
diff --git a/flink-table/flink-sql-gateway/src/test/resources/resultInfo.txt 
b/flink-table/flink-sql-gateway/src/test/resources/result_info_json_format.txt
similarity index 55%
rename from flink-table/flink-sql-gateway/src/test/resources/resultInfo.txt
rename to 
flink-table/flink-sql-gateway/src/test/resources/result_info_json_format.txt
index 42b972a757e..486f9a54e66 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/resultInfo.txt
+++ 
b/flink-table/flink-sql-gateway/src/test/resources/result_info_json_format.txt
@@ -1 +1 @@
-{"columns":[{"name":"bool","logicalType":{"type":"BOOLEAN","nullable":true},"comment":null},{"name":"tinyint","logicalType":{"type":"TINYINT","nullable":true},"comment":null},{"name":"smallint","logicalType":{"type":"SMALLINT","nullable":true},"comment":null},{"name":"int","logicalType":{"type":"INTEGER","nullable":true},"comment":null},{"name":"bigint","logicalType":{"type":"BIGINT","nullable":true},"comment":null},{"name":"float","logicalType":{"type":"FLOAT","nullable":true},"comment"
 [...]
+{"columns":[{"name":"bool","logicalType":{"type":"BOOLEAN","nullable":true},"comment":null},{"name":"tinyint","logicalType":{"type":"TINYINT","nullable":true},"comment":null},{"name":"smallint","logicalType":{"type":"SMALLINT","nullable":true},"comment":null},{"name":"int","logicalType":{"type":"INTEGER","nullable":true},"comment":null},{"name":"bigint","logicalType":{"type":"BIGINT","nullable":true},"comment":null},{"name":"float","logicalType":{"type":"FLOAT","nullable":true},"comment"
 [...]
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/result_info_plain_text_format.txt
 
b/flink-table/flink-sql-gateway/src/test/resources/result_info_plain_text_format.txt
new file mode 100644
index 00000000000..a640abfc9cc
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/test/resources/result_info_plain_text_format.txt
@@ -0,0 +1 @@
+{"columns":[{"name":"bool","logicalType":{"type":"BOOLEAN","nullable":true},"comment":null},{"name":"tinyint","logicalType":{"type":"TINYINT","nullable":true},"comment":null},{"name":"smallint","logicalType":{"type":"SMALLINT","nullable":true},"comment":null},{"name":"int","logicalType":{"type":"INTEGER","nullable":true},"comment":null},{"name":"bigint","logicalType":{"type":"BIGINT","nullable":true},"comment":null},{"name":"float","logicalType":{"type":"FLOAT","nullable":true},"comment"
 [...]
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql/begin_statement_set.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/begin_statement_set.q
index 56479dc4826..a36e200a606 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/begin_statement_set.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/begin_statement_set.q
@@ -73,17 +73,6 @@ create table StreamingTable2 (
 1 row in set
 !ok
 
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = 'c6d2eb2ade68485fb4d1848294150861';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
-
 BEGIN STATEMENT SET;
 !output
 +--------+
@@ -136,23 +125,8 @@ INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 
'Hello World'), (2, 'Hi'),
 
 END;
 !output
-+----------------------------------+
-|                           job id |
-+----------------------------------+
-| c6d2eb2ade68485fb4d1848294150861 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET '$internal.pipeline.job-id';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
 
 SELECT * FROM StreamingTable;
 !output
@@ -217,17 +191,6 @@ create table BatchTable (
 1 row in set
 !ok
 
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = 'e66a9bbf66c04a41b4d687e26c8296a0';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
-
 BEGIN STATEMENT SET;
 !output
 +--------+
@@ -260,23 +223,8 @@ INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello 
World'), (2, 'Hi'), (2,
 
 END;
 !output
-+----------------------------------+
-|                           job id |
-+----------------------------------+
-| e66a9bbf66c04a41b4d687e26c8296a0 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET '$internal.pipeline.job-id';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
 
 SELECT * FROM BatchTable;
 !output
diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
index 295b822cff0..323f24b81f1 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/insert.q
@@ -56,26 +56,10 @@ create table StreamingTable (
 1 row in set
 !ok
 
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = 'e68e7fabddfade4f42910980652582dc';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
-
 INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
 !output
-+----------------------------------+
-|                           job id |
-+----------------------------------+
-| e68e7fabddfade4f42910980652582dc |
-+----------------------------------+
-1 row in set
-!ok
+Job ID:
+!info
 
 RESET '$internal.pipeline.job-id';
 !output
@@ -134,36 +118,10 @@ create table BatchTable (
 1 row in set
 !ok
 
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = '29ba2263b9b86bd8a14b91487941bfe7';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
-
 INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), 
(2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
 !output
-+----------------------------------+
-|                           job id |
-+----------------------------------+
-| 29ba2263b9b86bd8a14b91487941bfe7 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET '$internal.pipeline.job-id';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
 
 SELECT * FROM BatchTable;
 !output
@@ -181,17 +139,6 @@ SELECT * FROM BatchTable;
 7 rows in set
 !ok
 
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = '84c7408a08c284d5736e50d3f5a648be';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
-
 CREATE TABLE CtasTable
 WITH (
   'connector' = 'filesystem',
@@ -200,23 +147,8 @@ WITH (
 )
 AS SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 
'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')) T(id, str);
 !output
-+----------------------------------+
-|                           job id |
-+----------------------------------+
-| 84c7408a08c284d5736e50d3f5a648be |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET '$internal.pipeline.job-id';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
 
 SELECT * FROM CtasTable;
 !output
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q 
b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
index 9f95973c831..63e9ccf2a75 100644
--- a/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
+++ b/flink-table/flink-sql-gateway/src/test/resources/sql/statement_set.q
@@ -120,39 +120,13 @@ 
Sink(table=[default_catalog.default_database.StreamingTable], fields=[EXPR$0, EX
 +- Reused(reference_id=[1])
 !ok
 
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = 'a5513ca0a886c6c9bafaf3acac43bfa5';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
-
 EXECUTE STATEMENT SET BEGIN
 INSERT INTO StreamingTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
 INSERT INTO StreamingTable2 SELECT * FROM (VALUES (1, 'Hello World'), (2, 
'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
 END;
 !output
-+----------------------------------+
-|                           job id |
-+----------------------------------+
-| a5513ca0a886c6c9bafaf3acac43bfa5 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET '$internal.pipeline.job-id';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
 
 SELECT * FROM StreamingTable;
 !output
@@ -235,7 +209,6 @@ str string
 1 row in set
 !ok
 
-
 create table BatchTable2 (
 id int,
 str string
@@ -285,40 +258,14 @@ 
Sink(table=[default_catalog.default_database.BatchTable2], fields=[EXPR$0, EXPR$
 +- Reused(reference_id=[1])
 !ok
 
-# test only to verify the test job id.
-SET '$internal.pipeline.job-id' = '2e2dc0a5a6315296062ba81eba340668';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
-
 EXECUTE STATEMENT SET
 BEGIN
 INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), 
(2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
 INSERT INTO BatchTable SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), 
(2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
 END;
 !output
-+----------------------------------+
-|                           job id |
-+----------------------------------+
-| 2e2dc0a5a6315296062ba81eba340668 |
-+----------------------------------+
-1 row in set
-!ok
-
-RESET '$internal.pipeline.job-id';
-!output
-+--------+
-| result |
-+--------+
-|     OK |
-+--------+
-1 row in set
-!ok
+Job ID:
+!info
 
 SELECT * FROM BatchTable;
 !output
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v1.snapshot
 
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v1.snapshot
index ae9bf88d5ca..8e5b27d3b6c 100644
--- 
a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v1.snapshot
+++ 
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v1.snapshot
@@ -242,19 +242,7 @@
       "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
     },
     "response" : {
-      "type" : "object",
-      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:FetchResultsResponseBody",
-      "properties" : {
-        "results" : {
-          "type" : "any"
-        },
-        "resultType" : {
-          "type" : "string"
-        },
-        "nextResultUri" : {
-          "type" : "string"
-        }
-      }
+      "type" : "any"
     }
   }, {
     "url" : "/sessions/:session_handle/operations/:operation_handle/status",
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
 
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
index 92824ac0c44..f4d14c6089f 100644
--- 
a/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
+++ 
b/flink-table/flink-sql-gateway/src/test/resources/sql_gateway_rest_api_v2.snapshot
@@ -301,26 +301,17 @@
       } ]
     },
     "query-parameters" : {
-      "queryParameters" : [ ]
+      "queryParameters" : [ {
+        "key" : "rowFormat",
+        "mandatory" : true
+      } ]
     },
     "request" : {
       "type" : "object",
       "id" : 
"urn:jsonschema:org:apache:flink:runtime:rest:messages:EmptyRequestBody"
     },
     "response" : {
-      "type" : "object",
-      "id" : 
"urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:FetchResultsResponseBody",
-      "properties" : {
-        "results" : {
-          "type" : "any"
-        },
-        "resultType" : {
-          "type" : "string"
-        },
-        "nextResultUri" : {
-          "type" : "string"
-        }
-      }
+      "type" : "any"
     }
   }, {
     "url" : "/sessions/:session_handle/operations/:operation_handle/status",
@@ -391,4 +382,4 @@
       }
     }
   } ]
-}
+}
\ No newline at end of file

Reply via email to