This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 626bdacdbe303d9e9c346e7ba7f3d8c7b42b1d8e
Author: Shengkai <1059623...@qq.com>
AuthorDate: Wed Jun 22 20:21:54 2022 +0800

    [FLINK-27766][sql-gateway] Introduce the OperationManager for the 
SqlGatewayService
---
 .../flink/table/gateway/api/SqlGatewayService.java |  69 ++++
 .../gateway/api/operation/OperationHandle.java     |  66 ++++
 .../gateway/api/operation/OperationStatus.java     |  89 +++++
 .../table/gateway/api/operation/OperationType.java |  31 ++
 .../table/gateway/api/results/OperationInfo.java   |  69 ++++
 .../flink/table/gateway/api/results/ResultSet.java | 131 +++++++
 .../gateway/api/operation/OperationStatusTest.java | 113 ++++++
 .../gateway/service/SqlGatewayServiceImpl.java     |  64 ++++
 .../gateway/service/context/SessionContext.java    |  24 +-
 .../table/gateway/service/operation/Operation.java | 160 +++++++++
 .../service/operation/OperationManager.java        | 199 +++++++++++
 .../gateway/service/result/ResultFetcher.java      | 173 ++++++++++
 .../table/gateway/service/result/ResultStore.java  | 149 ++++++++
 .../table/gateway/service/session/Session.java     |   8 +
 .../gateway/service/session/SessionManager.java    |   8 +-
 .../gateway/service/SqlGatewayServiceITCase.java   | 382 +++++++++++++++++++++
 .../gateway/service/result/ResultFetcherTest.java  | 364 ++++++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  28 ++
 18 files changed, 2123 insertions(+), 4 deletions(-)

diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
index 933c797f243..c0bdf01c36e 100644
--- 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java
@@ -19,11 +19,16 @@
 package org.apache.flink.table.gateway.api;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.OperationInfo;
+import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
 
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 /** A service of SQL gateway is responsible for handling requests from the 
endpoints. */
 @PublicEvolving
@@ -55,4 +60,68 @@ public interface SqlGatewayService {
      * @return Returns configuration of the session.
      */
     Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws 
SqlGatewayException;
+
+    // 
-------------------------------------------------------------------------------------------
+    // Operation Management
+    // 
-------------------------------------------------------------------------------------------
+
+    /**
+     * Submit an operation and execute. The {@link SqlGatewayService} will 
take care of the
+     * execution and assign the {@link OperationHandle} for later to retrieve 
the results.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param type describe the operation type.
+     * @param executor the main logic to get the execution results.
+     * @return Returns the handle for later retrieve results.
+     */
+    OperationHandle submitOperation(
+            SessionHandle sessionHandle, OperationType type, 
Callable<ResultSet> executor)
+            throws SqlGatewayException;
+
+    /**
+     * Cancel the operation when it is not in terminal status.
+     *
+     * <p>It can't cancel an Operation if it is terminated.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param operationHandle handle to identify the operation.JarURLConnection
+     */
+    void cancelOperation(SessionHandle sessionHandle, OperationHandle 
operationHandle)
+            throws SqlGatewayException;
+
+    /**
+     * Close the operation and release all used resource by the operation.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param operationHandle handle to identify the operation.
+     */
+    void closeOperation(SessionHandle sessionHandle, OperationHandle 
operationHandle)
+            throws SqlGatewayException;
+
+    /**
+     * Get the {@link OperationInfo} of the operation.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param operationHandle handle to identify the operation.
+     */
+    OperationInfo getOperationInfo(SessionHandle sessionHandle, 
OperationHandle operationHandle)
+            throws SqlGatewayException;
+
+    // 
-------------------------------------------------------------------------------------------
+    // Statements
+    // 
-------------------------------------------------------------------------------------------
+
+    /**
+     * Fetch the results from the operation. When maxRows is 
Integer.MAX_VALUE, it means to fetch
+     * all available data.
+     *
+     * @param sessionHandle handle to identify the session.
+     * @param operationHandle handle to identify the operation.
+     * @param token token to identify results.
+     * @param maxRows max number of rows to fetch.
+     * @return Returns the results.
+     */
+    ResultSet fetchResults(
+            SessionHandle sessionHandle, OperationHandle operationHandle, long 
token, int maxRows)
+            throws SqlGatewayException;
 }
diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationHandle.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationHandle.java
new file mode 100644
index 00000000000..dfdb3a6b173
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationHandle.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.operation;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.api.HandleIdentifier;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** {@link OperationHandle} to index the {@code Operation}. */
+@PublicEvolving
+public class OperationHandle {
+
+    private final HandleIdentifier identifier;
+
+    public static OperationHandle create() {
+        return new OperationHandle(new HandleIdentifier(UUID.randomUUID(), 
UUID.randomUUID()));
+    }
+
+    public OperationHandle(HandleIdentifier identifier) {
+        this.identifier = identifier;
+    }
+
+    public HandleIdentifier getIdentifier() {
+        return identifier;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof OperationHandle)) {
+            return false;
+        }
+        OperationHandle that = (OperationHandle) o;
+        return Objects.equals(identifier, that.identifier);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(identifier);
+    }
+
+    @Override
+    public String toString() {
+        return identifier.toString();
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationStatus.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationStatus.java
new file mode 100644
index 00000000000..4288276c19f
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationStatus.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.operation;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Status to describe the {@code Operation}. */
+@PublicEvolving
+public enum OperationStatus {
+    /** The operation is newly created. */
+    INITIALIZED(false),
+
+    /** Prepare the resources for the operation. */
+    PENDING(false),
+
+    /** The operation is running. */
+    RUNNING(false),
+
+    /** All the work is finished and ready for the client to fetch the 
results. */
+    FINISHED(true),
+
+    /** Operation has been cancelled. */
+    CANCELED(true),
+
+    /** Operation has been closed and all related resources are collected. */
+    CLOSED(true),
+
+    /** Some error happens. */
+    ERROR(true),
+
+    /** The execution of the operation timeout. */
+    TIMEOUT(true);
+
+    private final boolean isTerminalStatus;
+
+    OperationStatus(boolean isTerminalStatus) {
+        this.isTerminalStatus = isTerminalStatus;
+    }
+
+    public static boolean isValidStatusTransition(
+            OperationStatus fromStatus, OperationStatus toStatus) {
+        return toOperationStatusSet(fromStatus).contains(toStatus);
+    }
+
+    public boolean isTerminalStatus() {
+        return isTerminalStatus;
+    }
+
+    private static Set<OperationStatus> toOperationStatusSet(OperationStatus 
fromStatus) {
+        switch (fromStatus) {
+            case INITIALIZED:
+                return new HashSet<>(Arrays.asList(PENDING, CANCELED, CLOSED, 
TIMEOUT, ERROR));
+            case PENDING:
+                return new HashSet<>(Arrays.asList(RUNNING, CANCELED, CLOSED, 
TIMEOUT, ERROR));
+            case RUNNING:
+                return new HashSet<>(Arrays.asList(FINISHED, CANCELED, CLOSED, 
TIMEOUT, ERROR));
+            case FINISHED:
+            case CANCELED:
+            case TIMEOUT:
+            case ERROR:
+                return Collections.singleton(CLOSED);
+            case CLOSED:
+                return Collections.emptySet();
+            default:
+                throw new IllegalArgumentException("Unknown from status: " + 
fromStatus);
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
new file mode 100644
index 00000000000..babe99fd3f9
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.operation;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** The Operation Type. */
+@PublicEvolving
+public enum OperationType {
+    /** The type indicates the operation executes statements. */
+    EXECUTE_STATEMENT,
+
+    /** The type indicates the operation is unknown. */
+    UNKNOWN;
+}
diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
new file mode 100644
index 00000000000..7ebb28f5169
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+
+import java.util.Objects;
+
+/** Information of the {@code Operation}. */
+@PublicEvolving
+public class OperationInfo {
+
+    private final OperationStatus status;
+    private final boolean hasResults;
+    private final OperationType type;
+
+    public OperationInfo(OperationStatus status, OperationType type, boolean 
hasResults) {
+        this.status = status;
+        this.type = type;
+        this.hasResults = hasResults;
+    }
+
+    public boolean isHasResults() {
+        return hasResults;
+    }
+
+    public OperationType getType() {
+        return type;
+    }
+
+    public OperationStatus getStatus() {
+        return status;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof OperationInfo)) {
+            return false;
+        }
+        OperationInfo that = (OperationInfo) o;
+        return hasResults == that.hasResults && status == that.status && type 
== that.type;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(status, hasResults, type);
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
new file mode 100644
index 00000000000..3750a1cee95
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** The collection of the results. */
+@PublicEvolving
+public class ResultSet {
+
+    private final ResultType resultType;
+
+    @Nullable private final Long nextToken;
+
+    private final ResolvedSchema resultSchema;
+    private final List<RowData> data;
+
+    public static final ResultSet NOT_READY_RESULTS =
+            new ResultSet(
+                    ResultType.NOT_READY,
+                    0L,
+                    ResolvedSchema.of(Collections.emptyList()),
+                    Collections.emptyList());
+
+    public ResultSet(
+            ResultType resultType,
+            @Nullable Long nextToken,
+            ResolvedSchema resultSchema,
+            List<RowData> data) {
+        this.nextToken = nextToken;
+        this.resultType = resultType;
+        this.resultSchema = resultSchema;
+        this.data = data;
+    }
+
+    /** Get the type of the results, which may indicate the result is EOS or 
has data. */
+    public ResultType getResultType() {
+        return resultType;
+    }
+
+    /**
+     * The token indicates the next batch of the data.
+     *
+     * <p>When the token is null, it means all the data has been fetched.
+     */
+    public @Nullable Long getNextToken() {
+        return nextToken;
+    }
+
+    /** The schema of the data. */
+    public ResolvedSchema getResultSchema() {
+        return resultSchema;
+    }
+
+    /** All the data in the current results. */
+    public List<RowData> getData() {
+        return data;
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "ResultSet{\n"
+                        + "  resultType=%s,\n"
+                        + "  nextToken=%s,\n"
+                        + "  resultSchema=%s,\n"
+                        + "  data=[%s]\n"
+                        + "}",
+                resultType,
+                nextToken,
+                resultSchema.toString(),
+                
data.stream().map(Object::toString).collect(Collectors.joining(",")));
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ResultSet)) {
+            return false;
+        }
+        ResultSet resultSet = (ResultSet) o;
+        return resultType == resultSet.resultType
+                && Objects.equals(nextToken, resultSet.nextToken)
+                && Objects.equals(resultSchema, resultSet.resultSchema)
+                && Objects.equals(data, resultSet.data);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(resultType, nextToken, resultSchema, data);
+    }
+
+    /** Describe the kind of the result. */
+    public enum ResultType {
+        /** Indicate the result is not ready. */
+        NOT_READY,
+
+        /** Indicate the result has data. */
+        PAYLOAD,
+
+        /** Indicate all results have been fetched. */
+        EOS
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/operation/OperationStatusTest.java
 
b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/operation/OperationStatusTest.java
new file mode 100644
index 00000000000..4fb9b04c83c
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/operation/OperationStatusTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.operation;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+
+import static 
org.apache.flink.table.gateway.api.operation.OperationStatus.CANCELED;
+import static 
org.apache.flink.table.gateway.api.operation.OperationStatus.CLOSED;
+import static 
org.apache.flink.table.gateway.api.operation.OperationStatus.ERROR;
+import static 
org.apache.flink.table.gateway.api.operation.OperationStatus.FINISHED;
+import static 
org.apache.flink.table.gateway.api.operation.OperationStatus.INITIALIZED;
+import static 
org.apache.flink.table.gateway.api.operation.OperationStatus.PENDING;
+import static 
org.apache.flink.table.gateway.api.operation.OperationStatus.RUNNING;
+import static 
org.apache.flink.table.gateway.api.operation.OperationStatus.TIMEOUT;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link OperationStatus}. */
+public class OperationStatusTest {
+
+    @Test
+    public void testSupportedTransition() {
+        for (OperationStatusTransition translation : 
getSupportedOperationStatusTransition()) {
+            
assertTrue(OperationStatus.isValidStatusTransition(translation.from, 
translation.to));
+        }
+    }
+
+    @Test
+    public void testUnsupportedTransition() {
+        Set<OperationStatusTransition> supported = 
getSupportedOperationStatusTransition();
+        for (OperationStatus from : OperationStatus.values()) {
+            for (OperationStatus to : OperationStatus.values()) {
+                if (supported.contains(new OperationStatusTransition(from, 
to))) {
+                    continue;
+                }
+
+                assertFalse(OperationStatus.isValidStatusTransition(from, to));
+            }
+        }
+    }
+
+    private Set<OperationStatusTransition> 
getSupportedOperationStatusTransition() {
+        return new HashSet<>(
+                Arrays.asList(
+                        new OperationStatusTransition(INITIALIZED, PENDING),
+                        new OperationStatusTransition(PENDING, RUNNING),
+                        new OperationStatusTransition(RUNNING, FINISHED),
+                        new OperationStatusTransition(INITIALIZED, CANCELED),
+                        new OperationStatusTransition(PENDING, CANCELED),
+                        new OperationStatusTransition(RUNNING, CANCELED),
+                        new OperationStatusTransition(INITIALIZED, TIMEOUT),
+                        new OperationStatusTransition(PENDING, TIMEOUT),
+                        new OperationStatusTransition(RUNNING, TIMEOUT),
+                        new OperationStatusTransition(INITIALIZED, ERROR),
+                        new OperationStatusTransition(PENDING, ERROR),
+                        new OperationStatusTransition(RUNNING, ERROR),
+                        new OperationStatusTransition(INITIALIZED, CLOSED),
+                        new OperationStatusTransition(PENDING, CLOSED),
+                        new OperationStatusTransition(RUNNING, CLOSED),
+                        new OperationStatusTransition(CANCELED, CLOSED),
+                        new OperationStatusTransition(TIMEOUT, CLOSED),
+                        new OperationStatusTransition(ERROR, CLOSED),
+                        new OperationStatusTransition(FINISHED, CLOSED)));
+    }
+
+    private static class OperationStatusTransition {
+        OperationStatus from;
+        OperationStatus to;
+
+        public OperationStatusTransition(OperationStatus from, OperationStatus 
to) {
+            this.from = from;
+            this.to = to;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof OperationStatusTransition)) {
+                return false;
+            }
+            OperationStatusTransition that = (OperationStatusTransition) o;
+            return from == that.from && to == that.to;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(from, to);
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
index 2606f8f7763..39e253c020b 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java
@@ -20,6 +20,10 @@ package org.apache.flink.table.gateway.service;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.gateway.api.SqlGatewayService;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.OperationInfo;
+import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
@@ -30,6 +34,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.concurrent.Callable;
 
 /** The implementation of the {@link SqlGatewayService} interface. */
 public class SqlGatewayServiceImpl implements SqlGatewayService {
@@ -73,6 +78,65 @@ public class SqlGatewayServiceImpl implements 
SqlGatewayService {
         }
     }
 
+    @Override
+    public OperationHandle submitOperation(
+            SessionHandle sessionHandle, OperationType type, 
Callable<ResultSet> executor)
+            throws SqlGatewayException {
+        try {
+            return 
getSession(sessionHandle).getOperationManager().submitOperation(type, executor);
+        } catch (Throwable e) {
+            LOG.error("Failed to submitOperation.", e);
+            throw new SqlGatewayException("Failed to submitOperation.", e);
+        }
+    }
+
+    @Override
+    public void cancelOperation(SessionHandle sessionHandle, OperationHandle 
operationHandle) {
+        try {
+            
getSession(sessionHandle).getOperationManager().cancelOperation(operationHandle);
+        } catch (Throwable t) {
+            LOG.error("Failed to cancelOperation.", t);
+            throw new SqlGatewayException("Failed to cancelOperation.", t);
+        }
+    }
+
+    @Override
+    public void closeOperation(SessionHandle sessionHandle, OperationHandle 
operationHandle) {
+        try {
+            
getSession(sessionHandle).getOperationManager().closeOperation(operationHandle);
+        } catch (Throwable t) {
+            LOG.error("Failed to closeOperation.", t);
+            throw new SqlGatewayException("Failed to closeOperation.", t);
+        }
+    }
+
+    @Override
+    public OperationInfo getOperationInfo(
+            SessionHandle sessionHandle, OperationHandle operationHandle) {
+        try {
+            return getSession(sessionHandle)
+                    .getOperationManager()
+                    .getOperationInfo(operationHandle);
+        } catch (Throwable t) {
+            LOG.error("Failed to getOperationInfo.", t);
+            throw new SqlGatewayException("Failed to getOperationInfo.", t);
+        }
+    }
+
+    @Override
+    public ResultSet fetchResults(
+            SessionHandle sessionHandle, OperationHandle operationHandle, long 
token, int maxRows)
+            throws SqlGatewayException {
+        try {
+            return getSession(sessionHandle)
+                    .getOperationManager()
+                    .fetchResults(operationHandle, token, maxRows);
+        } catch (Throwable t) {
+            LOG.error("Failed to fetchResults.", t);
+            throw new SqlGatewayException("Failed to fetchResults.", t);
+        }
+    }
+
     @VisibleForTesting
     Session getSession(SessionHandle sessionHandle) {
         return sessionManager.getSession(sessionHandle);
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index 83bc5758ce0..b2e3921ba07 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -37,6 +37,7 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.PlannerFactoryUtil;
 import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
 import org.apache.flink.table.module.ModuleManager;
 
 import org.slf4j.Logger;
@@ -51,6 +52,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Context describing a session, it's mainly used for user to open a new 
session in the backend. If
@@ -69,17 +71,21 @@ public class SessionContext {
     private final SessionState sessionState;
     private final URLClassLoader userClassloader;
 
+    private final OperationManager operationManager;
+
     private SessionContext(
             SessionHandle sessionId,
             EndpointVersion endpointVersion,
             Configuration sessionConf,
             URLClassLoader classLoader,
-            SessionState sessionState) {
+            SessionState sessionState,
+            OperationManager operationManager) {
         this.sessionId = sessionId;
         this.endpointVersion = endpointVersion;
         this.sessionConf = sessionConf;
         this.userClassloader = classLoader;
         this.sessionState = sessionState;
+        this.operationManager = operationManager;
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -108,6 +114,8 @@ public class SessionContext {
         } catch (IOException e) {
             LOG.debug("Error while closing class loader.", e);
         }
+
+        operationManager.close();
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -118,7 +126,8 @@ public class SessionContext {
             DefaultContext defaultContext,
             SessionHandle sessionId,
             EndpointVersion endpointVersion,
-            Configuration sessionConf) {
+            Configuration sessionConf,
+            ExecutorService operationExecutorService) {
         // 
--------------------------------------------------------------------------------------------------------------
         // Init config
         // 
--------------------------------------------------------------------------------------------------------------
@@ -159,7 +168,12 @@ public class SessionContext {
                 new SessionState(catalogManager, moduleManager, 
functionCatalog);
 
         return new SessionContext(
-                sessionId, endpointVersion, configuration, classLoader, 
sessionState);
+                sessionId,
+                endpointVersion,
+                configuration,
+                classLoader,
+                sessionState,
+                new OperationManager(operationExecutorService));
     }
 
     private static URLClassLoader buildClassLoader(
@@ -204,6 +218,10 @@ public class SessionContext {
                 userClassloader);
     }
 
+    public OperationManager getOperationManager() {
+        return operationManager;
+    }
+
     private TableEnvironmentInternal createStreamTableEnvironment(
             StreamExecutionEnvironment env,
             EnvironmentSettings settings,
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java
new file mode 100644
index 00000000000..2262563edc6
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.OperationInfo;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.table.gateway.api.results.ResultSet.NOT_READY_RESULTS;
+
+/** Operation to manage the execution, results and so on. */
+public class Operation {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Operation.class);
+
+    private final OperationHandle operationHandle;
+
+    private final OperationType operationType;
+    private final boolean hasResults;
+    private final AtomicReference<OperationStatus> status;
+
+    private final Callable<ResultFetcher> resultSupplier;
+
+    private Future<?> invocation;
+    private volatile ResultFetcher resultFetcher;
+    private volatile SqlExecutionException operationError;
+
+    public Operation(
+            OperationHandle operationHandle,
+            OperationType operationType,
+            Callable<ResultFetcher> resultSupplier) {
+        this.operationHandle = operationHandle;
+        this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+        this.operationType = operationType;
+        this.hasResults = true;
+        this.resultSupplier = resultSupplier;
+    }
+
+    void runBefore() {
+        updateState(OperationStatus.RUNNING);
+    }
+
+    void runAfter() {
+        updateState(OperationStatus.FINISHED);
+    }
+
+    public void run(ExecutorService service) {
+        updateState(OperationStatus.PENDING);
+        invocation =
+                service.submit(
+                        () -> {
+                            try {
+                                runBefore();
+                                resultFetcher = resultSupplier.call();
+                                runAfter();
+                            } catch (Exception e) {
+                                String msg =
+                                        String.format(
+                                                "Failed to execute the 
operation %s.",
+                                                operationHandle);
+                                LOG.error(msg, e);
+                                operationError = new 
SqlExecutionException(msg, e);
+                                updateState(OperationStatus.ERROR);
+                            }
+                        });
+    }
+
+    public void cancel() {
+        updateState(OperationStatus.CANCELED);
+        closeResources();
+    }
+
+    public void close() {
+        updateState(OperationStatus.CLOSED);
+        closeResources();
+    }
+
+    public ResultSet fetchResults(long token, int maxRows) {
+        OperationStatus currentStatus = status.get();
+
+        if (currentStatus == OperationStatus.ERROR) {
+            throw operationError;
+        } else if (currentStatus == OperationStatus.FINISHED) {
+            return resultFetcher.fetchResults(token, maxRows);
+        } else if (currentStatus == OperationStatus.RUNNING
+                || currentStatus == OperationStatus.PENDING
+                || currentStatus == OperationStatus.INITIALIZED) {
+            return NOT_READY_RESULTS;
+        } else {
+            throw new SqlGatewayException(
+                    String.format(
+                            "Can not fetch results from the %s in %s status.",
+                            operationHandle, currentStatus));
+        }
+    }
+
+    public OperationInfo getOperationInfo() {
+        return new OperationInfo(status.get(), operationType, hasResults);
+    }
+
+    private void updateState(OperationStatus toStatus) {
+        OperationStatus currentStatus;
+        do {
+            currentStatus = status.get();
+            boolean isValid = 
OperationStatus.isValidStatusTransition(currentStatus, toStatus);
+            if (!isValid) {
+                String message =
+                        String.format(
+                                "Failed to convert the Operation Status from 
%s to %s for %s.",
+                                currentStatus, toStatus, operationHandle);
+                LOG.error(message);
+                throw new SqlGatewayException(message);
+            }
+        } while (!status.compareAndSet(currentStatus, toStatus));
+
+        LOG.debug(
+                String.format(
+                        "Convert operation %s from %s to %s.",
+                        operationHandle, currentStatus, toStatus));
+    }
+
+    private void closeResources() {
+        if (invocation != null && !invocation.isDone()) {
+            invocation.cancel(true);
+        }
+
+        if (resultFetcher != null) {
+            resultFetcher.close();
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
new file mode 100644
index 00000000000..878674a0e72
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.OperationInfo;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+/** Manager for the {@link Operation}. */
+public class OperationManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OperationManager.class);
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final Map<OperationHandle, Operation> submittedOperations;
+    private final ExecutorService service;
+
+    private boolean isRunning;
+
+    public OperationManager(ExecutorService service) {
+        this.service = service;
+        this.submittedOperations = new HashMap<>();
+        this.isRunning = true;
+    }
+
+    /**
+     * Submit the operation to the {@link OperationManager}. The {@link 
OperationManager} manges the
+     * lifecycle of the {@link Operation}, including register resources, fire 
the execution and so
+     * on.
+     *
+     * @param operationType The type of the submitted operation.
+     * @param executor Worker to execute.
+     * @return OperationHandle to fetch the results or check the status.
+     */
+    public OperationHandle submitOperation(
+            OperationType operationType, Callable<ResultSet> executor) {
+        OperationHandle handle = OperationHandle.create();
+        Operation operation =
+                new Operation(
+                        handle,
+                        operationType,
+                        () -> {
+                            ResultSet resultSet = executor.call();
+                            List<RowData> rows = resultSet.getData();
+                            return new ResultFetcher(
+                                    handle,
+                                    resultSet.getResultSchema(),
+                                    
CloseableIterator.adapterForIterator(rows.iterator()),
+                                    rows.size());
+                        });
+
+        writeLock(
+                () -> {
+                    submittedOperations.put(handle, operation);
+                    operation.run(service);
+                });
+        return handle;
+    }
+
+    /**
+     * Cancel the execution of the operation.
+     *
+     * @param operationHandle identifies the {@link Operation}.
+     */
+    public void cancelOperation(OperationHandle operationHandle) {
+        getOperation(operationHandle).cancel();
+    }
+
+    /**
+     * Close the operation and release all resources used by the {@link 
Operation}.
+     *
+     * @param operationHandle identifies the {@link Operation}.
+     */
+    public void closeOperation(OperationHandle operationHandle) {
+        writeLock(
+                () -> {
+                    Operation opToRemove = 
submittedOperations.remove(operationHandle);
+                    if (opToRemove != null) {
+                        opToRemove.close();
+                    }
+                });
+    }
+
+    /**
+     * Get the {@link OperationInfo} of the operation.
+     *
+     * @param operationHandle identifies the {@link Operation}.
+     */
+    public OperationInfo getOperationInfo(OperationHandle operationHandle) {
+        return getOperation(operationHandle).getOperationInfo();
+    }
+
+    /**
+     * Get the results of the operation.
+     *
+     * @param operationHandle identifies the {@link Operation}.
+     * @param token identifies which batch of data to fetch.
+     * @param maxRows the maximum number of rows to fetch.
+     * @return ResultSet contains the results.
+     */
+    public ResultSet fetchResults(OperationHandle operationHandle, long token, 
int maxRows) {
+        return getOperation(operationHandle).fetchResults(token, maxRows);
+    }
+
+    /** Closes the {@link OperationManager} and all operations. */
+    public void close() {
+        lock.writeLock().lock();
+        try {
+            isRunning = false;
+            for (Operation operation : submittedOperations.values()) {
+                operation.close();
+            }
+            submittedOperations.clear();
+        } finally {
+            lock.writeLock().unlock();
+        }
+        LOG.debug("Closes the Operation Manager.");
+    }
+
+    // 
-------------------------------------------------------------------------------------------
+
+    @VisibleForTesting
+    public int getOperationCount() {
+        return submittedOperations.size();
+    }
+
+    @VisibleForTesting
+    public Operation getOperation(OperationHandle operationHandle) {
+        return readLock(
+                () -> {
+                    Operation operation = 
submittedOperations.get(operationHandle);
+                    if (operation == null) {
+                        throw new SqlGatewayException(
+                                String.format(
+                                        "Can not find the submitted operation 
in the OperationManager with the %s.",
+                                        operationHandle));
+                    }
+                    return operation;
+                });
+    }
+
+    private void writeLock(Runnable runner) {
+        lock.writeLock().lock();
+        try {
+            if (!isRunning) {
+                throw new SqlGatewayException("The OperationManager is 
closed.");
+            }
+            runner.run();
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private <T> T readLock(Supplier<T> supplier) {
+        lock.readLock().lock();
+        try {
+            if (!isRunning) {
+                throw new SqlGatewayException("The OperationManager is 
closed.");
+            }
+            return supplier.get();
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
new file mode 100644
index 00000000000..145639552dd
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A fetcher to fetch result from submitted statement.
+ *
+ * <p>The fetcher uses the {@link Iterator} model. It means every time fetch 
the result with the
+ * current token, the fetcher will move forward and retire the old data.
+ *
+ * <p>After closes, the fetcher will not fetch the results from the remote but 
is able to return all
+ * data in the local cache.
+ */
+public class ResultFetcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResultFetcher.class);
+
+    private final OperationHandle operationHandle;
+
+    private final ResolvedSchema resultSchema;
+    private final ResultStore resultStore;
+    private final LinkedList<RowData> bufferedResults = new LinkedList<>();
+    private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>();
+
+    private long currentToken = 0;
+    private boolean noMoreResults = false;
+
+    public ResultFetcher(
+            OperationHandle operationHandle,
+            ResolvedSchema resultSchema,
+            CloseableIterator<RowData> resultRows,
+            int maxBufferSize) {
+        this.operationHandle = operationHandle;
+        this.resultSchema = resultSchema;
+        this.resultStore = new ResultStore(resultRows, maxBufferSize);
+    }
+
+    public void close() {
+        resultStore.close();
+    }
+
+    /**
+     * Fetch results from the result store. It tries to return the data cached 
in the buffer first.
+     * If the buffer is empty, then fetch results from the {@link 
ResultStore}. It's possible
+     * multiple threads try to fetch results in parallel. To keep the data 
integration, use the
+     * synchronized to allow only one thread can fetch the result at any time. 
TODO: we should
+     * forbid concurrently fetch results in the FLINK-28053.
+     */
+    public synchronized ResultSet fetchResults(long token, int maxFetchSize) {
+        if (maxFetchSize <= 0) {
+            throw new IllegalArgumentException("The max rows should be larger 
than 0.");
+        }
+
+        if (token == currentToken) {
+            // equal to the Iterator.next()
+            if (noMoreResults) {
+                LOG.debug("There is no more result for operation: {}.", 
operationHandle);
+                return new ResultSet(
+                        ResultSet.ResultType.EOS, null, resultSchema, 
Collections.emptyList());
+            }
+
+            // a new token arrives, move the current buffer data into the prev 
buffered results.
+            bufferedPrevResults.clear();
+            if (bufferedResults.isEmpty()) {
+                // buffered results have been totally consumed,
+                // so try to fetch new results
+                Optional<List<RowData>> newResults = 
resultStore.retrieveRecords();
+
+                if (newResults.isPresent()) {
+                    bufferedResults.addAll(newResults.get());
+                } else {
+                    noMoreResults = true;
+                    return new ResultSet(
+                            ResultSet.ResultType.EOS, null, resultSchema, 
Collections.emptyList());
+                }
+            }
+
+            int resultSize = Math.min(bufferedResults.size(), maxFetchSize);
+            LOG.debug(
+                    "Fetching current result for operation: {}, token: {}, 
maxFetchSize: {}, resultSize: {}.",
+                    operationHandle,
+                    token,
+                    maxFetchSize,
+                    resultSize);
+
+            // move forward
+            currentToken++;
+            // move result to buffer
+            for (int i = 0; i < resultSize; i++) {
+                bufferedPrevResults.add(bufferedResults.removeFirst());
+            }
+            return new ResultSet(
+                    ResultSet.ResultType.PAYLOAD, currentToken, resultSchema, 
bufferedPrevResults);
+        } else if (token == currentToken - 1 && token >= 0) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Fetching previous result for operation: {}, token: 
{}, maxFetchSize: {}",
+                        operationHandle,
+                        token,
+                        maxFetchSize);
+            }
+            if (maxFetchSize < bufferedPrevResults.size()) {
+                String msg =
+                        String.format(
+                                "As the same token is provided, fetch size 
must be not less than the previous returned buffer size."
+                                        + " Previous returned result size is 
%s, current max_fetch_size to be %s.",
+                                bufferedPrevResults.size(), maxFetchSize);
+                if (LOG.isDebugEnabled()) {
+                    LOG.error(msg);
+                }
+                throw new SqlExecutionException(msg);
+            }
+            return new ResultSet(
+                    ResultSet.ResultType.PAYLOAD, currentToken, resultSchema, 
bufferedPrevResults);
+        } else {
+            String msg;
+            if (currentToken == 0) {
+                msg = "Expecting token to be 0, but found " + token + ".";
+            } else {
+                msg =
+                        "Expecting token to be "
+                                + currentToken
+                                + " or "
+                                + (currentToken - 1)
+                                + ", but found "
+                                + token
+                                + ".";
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.error(msg);
+            }
+            throw new SqlExecutionException(msg);
+        }
+    }
+
+    @VisibleForTesting
+    public ResultStore getResultStore() {
+        return resultStore;
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java
new file mode 100644
index 00000000000..180da8f9e59
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** A result store which stores and buffers results. */
+public class ResultStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResultStore.class);
+
+    private final CloseableIterator<RowData> result;
+    private final List<RowData> recordsBuffer = new ArrayList<>();
+    private final int maxBufferSize;
+
+    private final Object resultLock = new Object();
+    private final AtomicReference<SqlExecutionException> executionException =
+            new AtomicReference<>();
+    private final ResultRetrievalThread retrievalThread = new 
ResultRetrievalThread();
+
+    public ResultStore(CloseableIterator<RowData> result, int maxBufferSize) {
+        this.result = result;
+        this.maxBufferSize = maxBufferSize;
+        this.retrievalThread.start();
+    }
+
+    public void close() {
+        retrievalThread.isRunning = false;
+        retrievalThread.interrupt();
+
+        try {
+            result.close();
+        } catch (Exception e) {
+            LOG.error("Failed to close the ResultStore. Ignore the error.", e);
+        }
+    }
+
+    public Optional<List<RowData>> retrieveRecords() {
+        synchronized (resultLock) {
+            // retrieval thread is alive return a record if available
+            // but the program must not have failed
+            if (isRetrieving() && executionException.get() == null) {
+                if (recordsBuffer.isEmpty()) {
+                    return Optional.of(Collections.emptyList());
+                } else {
+                    final List<RowData> change = new 
ArrayList<>(recordsBuffer);
+                    recordsBuffer.clear();
+                    resultLock.notify();
+                    return Optional.of(change);
+                }
+            }
+            // retrieval thread is dead but there is still a record to be 
delivered
+            else if (!isRetrieving() && !recordsBuffer.isEmpty()) {
+                final List<RowData> change = new ArrayList<>(recordsBuffer);
+                recordsBuffer.clear();
+                return Optional.of(change);
+            }
+            // no results can be returned anymore
+            else {
+                return handleMissingResult();
+            }
+        }
+    }
+
+    @VisibleForTesting
+    public int getBufferedRecordSize() {
+        synchronized (resultLock) {
+            return recordsBuffer.size();
+        }
+    }
+
+    private boolean isRetrieving() {
+        return retrievalThread.isRunning;
+    }
+
+    private Optional<List<RowData>> handleMissingResult() {
+        if (executionException.get() != null) {
+            throw executionException.get();
+        }
+
+        // we assume that a bounded job finished
+        return Optional.empty();
+    }
+
+    private void processRecord(RowData row) {
+        synchronized (resultLock) {
+            // wait if the buffer is full
+            if (recordsBuffer.size() >= maxBufferSize) {
+                try {
+                    resultLock.wait();
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            recordsBuffer.add(row);
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    /** Thread to retrieve data from the {@link CloseableIterator} and 
process. */
+    private class ResultRetrievalThread extends Thread {
+        public volatile boolean isRunning = true;
+
+        @Override
+        public void run() {
+            try {
+                while (isRunning && result.hasNext()) {
+                    processRecord(result.next());
+                }
+            } catch (RuntimeException e) {
+                executionException.compareAndSet(
+                        null, new SqlExecutionException("Error while 
retrieving result.", e));
+            }
+
+            // no result anymore
+            // either the job is done or an error occurred
+            isRunning = false;
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
index 28fc1eb81cf..57cb04dbc25 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.session;
 
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
 
 import java.io.Closeable;
 import java.util.Map;
@@ -27,6 +28,9 @@ import java.util.Map;
 /**
  * Similar to HTTP Session, which could maintain user identity and store 
user-specific data during
  * multiple request/response interactions between a client and the gateway 
server.
+ *
+ * <p>TODO: make operation execution in sequence in
+ * https://issues.apache.org/jira/browse/FLINK-28053
  */
 public class Session implements Closeable {
 
@@ -53,6 +57,10 @@ public class Session implements Closeable {
         return sessionContext.getConfigMap();
     }
 
+    public OperationManager getOperationManager() {
+        return sessionContext.getOperationManager();
+    }
+
     @Override
     public void close() {
         sessionContext.close();
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
index 7d1457bf3d9..77a3097783d 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java
@@ -152,7 +152,8 @@ public class SessionManager {
                         defaultContext,
                         sessionId,
                         environment.getSessionEndpointVersion(),
-                        Configuration.fromMap(environment.getSessionConfig()));
+                        Configuration.fromMap(environment.getSessionConfig()),
+                        operationExecutorService);
         session = new Session(sessionContext);
         sessions.put(sessionId, session);
 
@@ -211,4 +212,9 @@ public class SessionManager {
     int currentSessionCount() {
         return sessions.size();
     }
+
+    @VisibleForTesting
+    public int getOperationCount(SessionHandle sessionHandle) {
+        return 
getSession(sessionHandle).getOperationManager().getOperationCount();
+    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
index 048af7401c8..52d74fb882e 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java
@@ -18,21 +18,54 @@
 
 package org.apache.flink.table.gateway.service;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.OperationInfo;
+import org.apache.flink.table.gateway.api.results.ResultSet;
 import org.apache.flink.table.gateway.api.session.SessionEnvironment;
 import org.apache.flink.table.gateway.api.session.SessionHandle;
 import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.operation.Operation;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
 import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.function.RunnableWithException;
 
+import org.assertj.core.api.Assertions;
 import org.hamcrest.Matchers;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** ITCase for {@link SqlGatewayServiceImpl}. */
 public class SqlGatewayServiceITCase extends AbstractTestBase {
@@ -41,10 +74,17 @@ public class SqlGatewayServiceITCase extends 
AbstractTestBase {
     public static final SqlGatewayServiceExtension 
SQL_GATEWAY_SERVICE_EXTENSION =
             new SqlGatewayServiceExtension();
 
+    private static SessionManager sessionManager;
     private static SqlGatewayServiceImpl service;
 
+    private final SessionEnvironment defaultSessionEnvironment =
+            SessionEnvironment.newBuilder()
+                    .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                    .build();
+
     @BeforeAll
     public static void setUp() {
+        sessionManager = SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager();
         service = (SqlGatewayServiceImpl) 
SQL_GATEWAY_SERVICE_EXTENSION.getService();
     }
 
@@ -70,4 +110,346 @@ public class SqlGatewayServiceITCase extends 
AbstractTestBase {
                                 actualConfig,
                                 Matchers.hasEntry(k, v)));
     }
+
+    @Test
+    public void testFetchResultsInRunning() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+        CountDownLatch endRunningLatch = new CountDownLatch(1);
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            endRunningLatch.await();
+                        });
+
+        startRunningLatch.await();
+        assertEquals(
+                ResultSet.NOT_READY_RESULTS,
+                service.fetchResults(sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE));
+        endRunningLatch.countDown();
+    }
+
+    @Test
+    public void testGetOperationFinishedAndFetchResults() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+        CountDownLatch endRunningLatch = new CountDownLatch(1);
+
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            endRunningLatch.await();
+                        });
+
+        startRunningLatch.await();
+        assertEquals(
+                new OperationInfo(OperationStatus.RUNNING, 
OperationType.UNKNOWN, true),
+                service.getOperationInfo(sessionHandle, operationHandle));
+
+        endRunningLatch.countDown();
+        OperationInfo expectedInfo =
+                new OperationInfo(OperationStatus.FINISHED, 
OperationType.UNKNOWN, true);
+
+        CommonTestUtils.waitUtil(
+                () -> service.getOperationInfo(sessionHandle, 
operationHandle).equals(expectedInfo),
+                Duration.ofSeconds(10),
+                "Failed to wait operation finish.");
+
+        Long token = 0L;
+        List<RowData> expectedData = getDefaultResultSet().getData();
+        List<RowData> actualData = new ArrayList<>();
+        while (token != null) {
+            ResultSet currentResult =
+                    service.fetchResults(sessionHandle, operationHandle, 
token, 1);
+            actualData.addAll(checkNotNull(currentResult.getData()));
+            token = currentResult.getNextToken();
+        }
+        assertEquals(expectedData, actualData);
+
+        service.closeOperation(sessionHandle, operationHandle);
+        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+    }
+
+    @Test
+    public void testCancelOperation() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+        CountDownLatch endRunningLatch = new CountDownLatch(1);
+
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            endRunningLatch.await();
+                        });
+
+        startRunningLatch.await();
+        assertEquals(
+                new OperationInfo(OperationStatus.RUNNING, 
OperationType.UNKNOWN, true),
+                service.getOperationInfo(sessionHandle, operationHandle));
+
+        service.cancelOperation(sessionHandle, operationHandle);
+
+        assertEquals(
+                new OperationInfo(OperationStatus.CANCELED, 
OperationType.UNKNOWN, true),
+                service.getOperationInfo(sessionHandle, operationHandle));
+        service.closeOperation(sessionHandle, operationHandle);
+        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+    }
+
+    @Test
+    public void testOperationGetErrorAndFetchError() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+
+        String msg = "Artificial Exception.";
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            throw new SqlExecutionException(msg);
+                        });
+        startRunningLatch.await();
+
+        CommonTestUtils.waitUtil(
+                () ->
+                        service.getOperationInfo(sessionHandle, 
operationHandle)
+                                .getStatus()
+                                .equals(OperationStatus.ERROR),
+                Duration.ofSeconds(10),
+                "Failed to get expected operation status.");
+
+        Assertions.assertThatThrownBy(
+                        () ->
+                                service.fetchResults(
+                                        sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE))
+                .satisfies(anyCauseMatches(SqlExecutionException.class, msg));
+
+        service.closeOperation(sessionHandle, operationHandle);
+        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Concurrent tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testCancelOperationAndFetchResultInParallel() throws Exception 
{
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        CountDownLatch latch = new CountDownLatch(1);
+        // Make sure cancel the Operation before finish.
+        OperationHandle operationHandle = 
submitDefaultOperation(sessionHandle, latch::await);
+        runCancelOrCloseOperationWhenFetchResults(
+                sessionHandle,
+                operationHandle,
+                () -> service.cancelOperation(sessionHandle, operationHandle),
+                String.format(
+                        "Can not fetch results from the %s in %s status.",
+                        operationHandle, OperationStatus.CANCELED));
+        latch.countDown();
+    }
+
+    @Test
+    public void testCloseOperationAndFetchResultInParallel() {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            // allow close before execution finish.
+                            Thread.sleep(1);
+                        });
+        runCancelOrCloseOperationWhenFetchResults(
+                sessionHandle,
+                operationHandle,
+                () -> service.closeOperation(sessionHandle, operationHandle),
+                String.format(
+                        "Can not find the submitted operation in the 
OperationManager with the %s.",
+                        operationHandle));
+    }
+
+    @Test
+    public void testCancelAndCloseOperationInParallel() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        int operationNum = 200;
+        List<Operation> operations = new ArrayList<>(operationNum);
+        for (int i = 0; i < operationNum; i++) {
+            boolean throwError = i % 2 == 0;
+            OperationHandle operationHandle =
+                    submitDefaultOperation(
+                            sessionHandle,
+                            () -> {
+                                // allow cancel/close before execution finish.
+                                Thread.sleep(100);
+                                if (throwError) {
+                                    throw new SqlGatewayException("Artificial 
Exception.");
+                                }
+                            });
+
+            operations.add(
+                    service.getSession(sessionHandle)
+                            .getOperationManager()
+                            .getOperation(operationHandle));
+            new Thread(() -> service.cancelOperation(sessionHandle, 
operationHandle)).start();
+            new Thread(() -> service.closeOperation(sessionHandle, 
operationHandle)).start();
+        }
+
+        CommonTestUtils.waitUtil(
+                () ->
+                        
service.getSession(sessionHandle).getOperationManager().getOperationCount()
+                                == 0,
+                Duration.ofSeconds(10),
+                "All operation should be closed.");
+
+        for (Operation op : operations) {
+            assertEquals(OperationStatus.CLOSED, 
op.getOperationInfo().getStatus());
+        }
+    }
+
+    @Test
+    public void testSubmitOperationAndCloseOperationManagerInParallel() throws 
Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        OperationManager manager = 
service.getSession(sessionHandle).getOperationManager();
+        int submitThreadsNum = 100;
+        CountDownLatch latch = new CountDownLatch(submitThreadsNum);
+        for (int i = 0; i < submitThreadsNum; i++) {
+            new Thread(
+                            () -> {
+                                try {
+                                    submitDefaultOperation(sessionHandle, () 
-> {});
+                                } finally {
+                                    latch.countDown();
+                                }
+                            })
+                    .start();
+        }
+        manager.close();
+        latch.await();
+        assertEquals(0, manager.getOperationCount());
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Negative tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testFetchResultsFromCanceledOperation() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch latch = new CountDownLatch(1);
+
+        OperationHandle operationHandle = 
submitDefaultOperation(sessionHandle, latch::await);
+        service.cancelOperation(sessionHandle, operationHandle);
+        assertThatThrownBy(
+                        () ->
+                                service.fetchResults(
+                                        sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE))
+                .satisfies(
+                        anyCauseMatches(
+                                String.format(
+                                        "Can not fetch results from the %s in 
%s status.",
+                                        operationHandle, 
OperationStatus.CANCELED)));
+        latch.countDown();
+    }
+
+    @Test
+    public void testRequestNonExistOperation() {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        OperationHandle operationHandle = OperationHandle.create();
+        List<RunnableWithException> requests =
+                Arrays.asList(
+                        () -> service.cancelOperation(sessionHandle, 
operationHandle),
+                        () -> service.getOperationInfo(sessionHandle, 
operationHandle),
+                        () ->
+                                service.fetchResults(
+                                        sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE));
+
+        for (RunnableWithException request : requests) {
+            assertThatThrownBy(request::run)
+                    .satisfies(
+                            anyCauseMatches(
+                                    String.format(
+                                            "Can not find the submitted 
operation in the OperationManager with the %s.",
+                                            operationHandle)));
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private OperationHandle submitDefaultOperation(
+            SessionHandle sessionHandle, RunnableWithException executor) {
+        return service.submitOperation(
+                sessionHandle,
+                OperationType.UNKNOWN,
+                () -> {
+                    executor.run();
+                    return getDefaultResultSet();
+                });
+    }
+
+    private ResultSet getDefaultResultSet() {
+        List<RowData> data =
+                Arrays.asList(
+                        GenericRowData.ofKind(INSERT, 1L, 
StringData.fromString("Flink CDC"), 3),
+                        GenericRowData.ofKind(INSERT, 2L, 
StringData.fromString("MySql"), null),
+                        GenericRowData.ofKind(DELETE, 1, null, null),
+                        GenericRowData.ofKind(UPDATE_AFTER, 2, null, 101));
+        return new ResultSet(
+                ResultSet.ResultType.PAYLOAD,
+                null,
+                ResolvedSchema.of(
+                        Column.physical("id", DataTypes.BIGINT()),
+                        Column.physical("name", DataTypes.STRING()),
+                        Column.physical("age", DataTypes.INT())),
+                data);
+    }
+
+    private void runCancelOrCloseOperationWhenFetchResults(
+            SessionHandle sessionHandle,
+            OperationHandle operationHandle,
+            RunnableWithException cancelOrClose,
+            String errorMsg) {
+
+        List<RowData> actual = new ArrayList<>();
+        new Thread(
+                        () -> {
+                            try {
+                                cancelOrClose.run();
+                            } catch (Exception e) {
+                                // ignore
+                            }
+                        })
+                .start();
+
+        assertThatThrownBy(
+                        () -> {
+                            Long token = 0L;
+                            while (token != null) {
+                                ResultSet resultSet =
+                                        service.fetchResults(
+                                                sessionHandle,
+                                                operationHandle,
+                                                token,
+                                                Integer.MAX_VALUE);
+                                token = resultSet.getNextToken();
+                                if (resultSet.getResultType() == 
ResultSet.ResultType.PAYLOAD) {
+                                    actual.addAll(resultSet.getData());
+                                }
+                            }
+                        })
+                .satisfies(anyCauseMatches(errorMsg));
+
+        assertTrue(new 
HashSet<>(getDefaultResultSet().getData()).containsAll(actual));
+    }
 }
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
new file mode 100644
index 00000000000..fc228241613
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.commons.collections.iterators.IteratorChain;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link ResultFetcher}. */
+public class ResultFetcherTest {
+
+    private static ResolvedSchema schema;
+    private static List<RowData> data;
+
+    @BeforeAll
+    public static void setUp() {
+        schema =
+                ResolvedSchema.of(
+                        Column.physical("boolean", DataTypes.BOOLEAN()),
+                        Column.physical("int", DataTypes.INT()),
+                        Column.physical("bigint", DataTypes.BIGINT()),
+                        Column.physical("varchar", DataTypes.STRING()),
+                        Column.physical("decimal(10, 5)", 
DataTypes.DECIMAL(10, 5)),
+                        Column.physical(
+                                "timestamp", 
DataTypes.TIMESTAMP(6).bridgedTo(Timestamp.class)),
+                        Column.physical("binary", DataTypes.BYTES()));
+        data =
+                Arrays.asList(
+                        GenericRowData.ofKind(
+                                RowKind.INSERT,
+                                null,
+                                1,
+                                2L,
+                                "abc",
+                                BigDecimal.valueOf(1.23),
+                                Timestamp.valueOf("2020-03-01 18:39:14"),
+                                new byte[] {50, 51, 52, -123, 54, 93, 115, 
126}),
+                        GenericRowData.ofKind(
+                                RowKind.UPDATE_BEFORE,
+                                false,
+                                null,
+                                0L,
+                                "",
+                                BigDecimal.valueOf(1),
+                                Timestamp.valueOf("2020-03-01 18:39:14.1"),
+                                new byte[] {100, -98, 32, 121, -125}),
+                        GenericRowData.ofKind(
+                                RowKind.UPDATE_AFTER,
+                                true,
+                                Integer.MAX_VALUE,
+                                null,
+                                "abcdefg",
+                                BigDecimal.valueOf(12345),
+                                Timestamp.valueOf("2020-03-01 18:39:14.12"),
+                                new byte[] {-110, -23, 1, 2}),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE,
+                                false,
+                                Integer.MIN_VALUE,
+                                Long.MAX_VALUE,
+                                null,
+                                BigDecimal.valueOf(12345.06789),
+                                Timestamp.valueOf("2020-03-01 18:39:14.123"),
+                                new byte[] {50, 51, 52, -123, 54, 93, 115, 
126}),
+                        GenericRowData.ofKind(
+                                RowKind.INSERT,
+                                true,
+                                100,
+                                Long.MIN_VALUE,
+                                "abcdefg111",
+                                null,
+                                Timestamp.valueOf("2020-03-01 
18:39:14.123456"),
+                                new byte[] {110, 23, -1, -2}),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE,
+                                null,
+                                -1,
+                                -1L,
+                                "abcdefghijklmnopqrstuvwxyz",
+                                BigDecimal.valueOf(-12345.06789),
+                                null,
+                                null),
+                        GenericRowData.ofKind(
+                                RowKind.INSERT,
+                                null,
+                                -1,
+                                -1L,
+                                "这是一段中文",
+                                BigDecimal.valueOf(-12345.06789),
+                                Timestamp.valueOf("2020-03-04 18:39:14"),
+                                new byte[] {-3, -2, -1, 0, 1, 2, 3}),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE,
+                                null,
+                                -1,
+                                -1L,
+                                "これは日本語をテストするための文です",
+                                BigDecimal.valueOf(-12345.06789),
+                                Timestamp.valueOf("2020-03-04 18:39:14"),
+                                new byte[] {-3, -2, -1, 0, 1, 2, 3}));
+    }
+
+    @Test
+    public void testFetchResultsMultipleTimesWithLimitedBufferSize() {
+        int bufferSize = data.size() / 2;
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
bufferSize);
+
+        runFetchMultipleTimes(fetcher, bufferSize, data.size());
+    }
+
+    @Test
+    public void testFetchResultsMultipleTimesWithLimitedFetchSize() {
+        int bufferSize = data.size();
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
bufferSize);
+
+        runFetchMultipleTimes(fetcher, bufferSize, data.size() / 2);
+    }
+
+    @Test
+    public void testFetchResultInParallel() throws Exception {
+        int bufferSize = data.size() / 2;
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
bufferSize);
+
+        AtomicReference<Boolean> isEqual = new AtomicReference<>(true);
+        int fetchThreadNum = 100;
+        CountDownLatch latch = new CountDownLatch(fetchThreadNum);
+
+        CommonTestUtils.waitUtil(
+                () -> fetcher.getResultStore().getBufferedRecordSize() > 0,
+                Duration.ofSeconds(10),
+                "Failed to wait the buffer has data.");
+        List<RowData> firstFetch = fetcher.fetchResults(0, 
Integer.MAX_VALUE).getData();
+        for (int i = 0; i < fetchThreadNum; i++) {
+            new Thread(
+                            () -> {
+                                ResultSet resultSet = fetcher.fetchResults(0, 
Integer.MAX_VALUE);
+
+                                if (!firstFetch.equals(resultSet.getData())) {
+                                    isEqual.set(false);
+                                }
+                                latch.countDown();
+                            })
+                    .start();
+        }
+
+        latch.await();
+        assertEquals(true, isEqual.get());
+    }
+
+    @Test
+    public void testFetchResultAfterClose() throws Exception {
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
data.size() + 1);
+        List<RowData> actual = Collections.emptyList();
+        long token = 0L;
+
+        while (actual.size() < 1) {
+            // fill the fetcher buffer
+            ResultSet resultSet = fetcher.fetchResults(token, 1);
+            token = checkNotNull(resultSet.getNextToken());
+            actual = resultSet.getData();
+        }
+        assertEquals(data.subList(0, 1), actual);
+        fetcher.close();
+
+        long testToken = token;
+        AtomicReference<Boolean> meetEnd = new AtomicReference<>(false);
+        new Thread(
+                        () -> {
+                            // Should meet EOS in the end.
+                            long nextToken = testToken;
+                            while (true) {
+                                ResultSet resultSet =
+                                        fetcher.fetchResults(nextToken, 
Integer.MAX_VALUE);
+                                if (resultSet.getResultType() == 
ResultSet.ResultType.EOS) {
+                                    break;
+                                }
+                                nextToken = 
checkNotNull(resultSet.getNextToken());
+                            }
+                            meetEnd.set(true);
+                        })
+                .start();
+
+        CommonTestUtils.waitUtil(
+                meetEnd::get,
+                Duration.ofSeconds(10),
+                "Should get EOS when fetch results from the closed fetcher.");
+    }
+
+    @Test
+    public void testFetchResultWithToken() {
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
data.size());
+        Long nextToken = 0L;
+        List<RowData> actual = new ArrayList<>();
+        ResultSet resultSetBefore = null;
+        while (nextToken != null) {
+            if (resultSetBefore != null) {
+                assertEquals(resultSetBefore, fetcher.fetchResults(nextToken - 
1, data.size()));
+            }
+
+            ResultSet resultSet = fetcher.fetchResults(nextToken, data.size());
+            ResultSet resultSetWithSameToken = fetcher.fetchResults(nextToken, 
data.size());
+
+            assertEquals(resultSet, resultSetWithSameToken);
+            if (resultSet.getResultType() == ResultSet.ResultType.EOS) {
+                break;
+            }
+            resultSetBefore = resultSet;
+
+            actual.addAll(checkNotNull(resultSet.getData()));
+            nextToken = resultSet.getNextToken();
+        }
+
+        assertEquals(data, actual);
+    }
+
+    @Test
+    public void testFetchFailedResult() {
+        String message = "Artificial Exception";
+        ResultFetcher fetcher =
+                buildResultFetcher(
+                        Arrays.asList(new ErrorIterator(message), 
data.iterator()), data.size());
+
+        assertThatThrownBy(
+                        () -> {
+                            Long token = 0L;
+                            while (token != null) {
+                                // Use loop to fetch results from the 
ErrorIterator
+                                token =
+                                        fetcher.fetchResults(token, 
Integer.MAX_VALUE)
+                                                .getNextToken();
+                            }
+                        })
+                .satisfies(FlinkAssertions.anyCauseMatches(message));
+    }
+
+    @Test
+    public void testFetchIllegalToken() {
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
data.size());
+        assertThatThrownBy(() -> fetcher.fetchResults(2, Integer.MAX_VALUE))
+                .satisfies(FlinkAssertions.anyCauseMatches("Expecting token to 
be 0, but found 2"));
+    }
+
+    @Test
+    public void testFetchBeforeWithDifferentSize() throws Exception {
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
data.size() / 2);
+        CommonTestUtils.waitUtil(
+                () -> fetcher.getResultStore().getBufferedRecordSize() > 1,
+                Duration.ofSeconds(10),
+                "Failed to make cached records num larger than 1.");
+
+        ResultSet firstFetch = fetcher.fetchResults(0, Integer.MAX_VALUE);
+        int firstFetchSize = firstFetch.getData().size();
+        assertThatThrownBy(() -> fetcher.fetchResults(0, firstFetchSize - 1))
+                .satisfies(
+                        FlinkAssertions.anyCauseMatches(
+                                String.format(
+                                        "As the same token is provided, fetch 
size must be not less than the previous returned buffer size."
+                                                + " Previous returned result 
size is %s, current max_fetch_size to be %s.",
+                                        firstFetch.getData().size(), 
firstFetchSize - 1)));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    @SuppressWarnings("unchecked")
+    private ResultFetcher buildResultFetcher(List<Iterator<RowData>> rows, int 
bufferSize) {
+        OperationHandle operationHandle = OperationHandle.create();
+        return new ResultFetcher(
+                operationHandle,
+                schema,
+                CloseableIterator.adapterForIterator(new IteratorChain(rows)),
+                bufferSize);
+    }
+
+    private void runFetchMultipleTimes(ResultFetcher fetcher, int bufferSize, 
int fetchSize) {
+        List<RowData> fetchedRows = new ArrayList<>();
+        ResultSet currentResult = null;
+        Long token = 0L;
+
+        while (token != null) {
+            currentResult = fetcher.fetchResults(token, fetchSize);
+            assertTrue(
+                    checkNotNull(currentResult.getData()).size()
+                            <= Math.min(bufferSize, fetchSize));
+            token = currentResult.getNextToken();
+            fetchedRows.addAll(currentResult.getData());
+        }
+
+        assertEquals(ResultSet.ResultType.EOS, 
checkNotNull(currentResult).getResultType());
+        assertEquals(data, fetchedRows);
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private static class ErrorIterator implements Iterator<RowData> {
+
+        private final String errorMsg;
+
+        public ErrorIterator(String errorMsg) {
+            this.errorMsg = errorMsg;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return true;
+        }
+
+        @Override
+        public RowData next() {
+            throw new SqlGatewayException(errorMsg);
+        }
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/test/resources/log4j2-test.properties 
b/flink-table/flink-sql-gateway/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000000..835c2ec9a3d
--- /dev/null
+++ b/flink-table/flink-sql-gateway/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n

Reply via email to