wuchong commented on code in PR #19823:
URL: https://github.com/apache/flink/pull/19823#discussion_r901117208


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.OperationInfo;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.flink.table.gateway.api.results.ResultSet.NOT_READY_RESULTS;
+
+/** Operation to manage the execution, results and so on. */
+public class Operation {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Operation.class);
+
+    private final OperationHandle operationHandle;
+
+    private final OperationType operationType;
+    private final boolean hasResults;
+    private final AtomicReference<OperationStatus> status;
+
+    private final Callable<ResultFetcher> resultSupplier;
+
+    private Future<?> invocation;
+    private volatile ResultFetcher resultFetcher;
+    private volatile SqlExecutionException operationError;
+
+    public Operation(
+            OperationHandle operationHandle,
+            OperationType operationType,
+            Callable<ResultFetcher> resultSupplier) {
+        this.operationHandle = operationHandle;
+        this.status = new AtomicReference<>(OperationStatus.INITIALIZED);
+        this.operationType = operationType;
+        this.hasResults = true;
+        this.resultSupplier = resultSupplier;
+    }
+
+    void runBefore() {
+        updateState(OperationStatus.RUNNING);
+    }
+
+    void runAfter() {
+        updateState(OperationStatus.FINISHED);
+    }
+
+    public void run(ExecutorService service) {
+        updateState(OperationStatus.PENDING);
+        invocation =
+                service.submit(
+                        () -> {
+                            try {
+                                runBefore();
+                                resultFetcher = resultSupplier.call();
+                                runAfter();
+                            } catch (Exception e) {
+                                String msg =
+                                        String.format(
+                                                "Failed to execute the 
operation %s.",
+                                                operationHandle);
+                                LOG.error(msg, e);
+                                operationError = new 
SqlExecutionException(msg, e);
+                                updateState(OperationStatus.ERROR);
+                            }
+                        });
+    }
+
+    public void cancel() {
+        updateState(OperationStatus.CANCELED);
+        closeResources();
+    }
+
+    public void close() {
+        updateState(OperationStatus.CLOSED);
+        closeResources();
+    }
+
+    public ResultSet fetchResults(long token, int maxRows) {
+        OperationStatus currentStatus = status.get();
+
+        if (currentStatus == OperationStatus.ERROR) {
+            throw operationError;
+        } else if (currentStatus == OperationStatus.FINISHED) {
+            return resultFetcher.fetchResults(token, maxRows);
+        } else if (currentStatus == OperationStatus.RUNNING
+                || currentStatus == OperationStatus.PENDING
+                || currentStatus == OperationStatus.INITIALIZED) {
+            return NOT_READY_RESULTS;
+        } else {
+            throw new SqlGatewayException(
+                    String.format(
+                            "Can not fetch results from the %s in %s status.",
+                            operationHandle, currentStatus));
+        }
+    }
+
+    public OperationInfo getOperationInfo() {
+        return new OperationInfo(status.get(), operationType, hasResults);
+    }
+
+    private void updateState(OperationStatus toStatus) {
+        OperationStatus currentStatus = status.get();
+        do {
+            boolean isValid = 
OperationStatus.isValidStatusTranslation(currentStatus, toStatus);
+            if (!isValid) {
+                String message =
+                        String.format(
+                                "Failed to convert the Operation Status from 
%s to %s for %s.",
+                                currentStatus, toStatus, operationHandle);
+                LOG.error(message);
+                throw new SqlGatewayException(message);
+            }
+            LOG.info(

Review Comment:
   Use `LOG.debug`.



##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java:
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.results;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/** The collection of the results. */
+@PublicEvolving
+public class ResultSet {
+
+    private final ResultType resultType;
+
+    private final Long nextToken;
+
+    private final ResolvedSchema resultSchema;
+    private final List<RowData> data;
+
+    public static final ResultSet NOT_READY_RESULTS =
+            new ResultSet(
+                    ResultType.NOT_READY,
+                    0L,
+                    ResolvedSchema.of(Column.physical("status", 
DataTypes.STRING())),
+                    Collections.singletonList(GenericRowData.of("NOT READY")));

Review Comment:
   What about using empty results and empty schema? It's redundant to fill the 
result with NOT READY which I think users will never use. It's a little 
confusing that a not ready result set contains results. 



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.OperationInfo;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.operation.Operation;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import 
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestResource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.assertj.core.api.Assertions;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static 
org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** ITCase for {@link SqlGatewayServiceImpl}. */
+public class SqlGatewayServiceITCase extends AbstractTestBase {
+
+    @ClassRule
+    public static final SqlGatewayServiceTestResource 
SQL_GATEWAY_SERVICE_RESOURCE =
+            new SqlGatewayServiceTestResource();
+
+    private static SessionManager sessionManager;
+    private static SqlGatewayServiceImpl service;
+
+    private final SessionEnvironment defaultSessionEnvironment =
+            SessionEnvironment.newBuilder()
+                    .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                    .build();
+
+    @BeforeClass
+    public static void setup() {
+        sessionManager = SQL_GATEWAY_SERVICE_RESOURCE.getSessionManager();
+        service = (SqlGatewayServiceImpl) 
SQL_GATEWAY_SERVICE_RESOURCE.getService();
+    }
+
+    @Test
+    public void testOpenSessionWithConfig() {
+        Map<String, String> options = new HashMap<>();
+        options.put("key1", "val1");
+        options.put("key2", "val2");
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .addSessionConfig(options)
+                        .build();
+
+        SessionHandle sessionHandle = service.openSession(environment);
+        Map<String, String> actualConfig = 
service.getSessionConfig(sessionHandle);
+
+        options.forEach(
+                (k, v) ->
+                        assertThat(
+                                String.format(
+                                        "Should contains (%s, %s) in the 
actual config.", k, v),
+                                actualConfig,
+                                Matchers.hasEntry(k, v)));
+    }
+
+    @Test
+    public void testFetchResultsInRunning() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+        CountDownLatch endRunningLatch = new CountDownLatch(1);
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            endRunningLatch.await();
+                        });
+
+        startRunningLatch.await();
+        assertEquals(
+                ResultSet.NOT_READY_RESULTS,
+                service.fetchResults(sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE));

Review Comment:
   Should countdown `endRunningLatch` at the end? 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.OperationInfo;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/** Manager for the {@link Operation}. */
+public class OperationManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OperationManager.class);
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final Map<OperationHandle, Operation> submittedOperations;
+    private final ExecutorService service;
+
+    private volatile boolean isRunning;
+
+    public OperationManager(ExecutorService service) {
+        this.service = service;
+        this.submittedOperations = new ConcurrentHashMap<>();
+        this.isRunning = true;
+    }
+
+    /**
+     * Submit the operation to the {@link OperationManager}. The {@link 
OperationManager} manges the
+     * lifecycle of the {@link Operation}, including register resources, fire 
the execution and so
+     * on.
+     *
+     * @param operationType The type of the submitted operation.
+     * @param executor Worker to execute.
+     * @return OperationHandle to fetch the results or check the status.
+     */
+    public OperationHandle submitOperation(
+            OperationType operationType, Callable<ResultSet> executor) {
+        OperationHandle handle = OperationHandle.create();
+        Operation operation =
+                new Operation(
+                        handle,
+                        operationType,
+                        () -> {
+                            ResultSet resultSet = executor.call();
+                            List<RowData> rows = resultSet.getData();
+                            return new ResultFetcher(
+                                    handle,
+                                    resultSet.getResultSchema(),
+                                    
CloseableIterator.adapterForIterator(rows.iterator()),
+                                    rows.size());
+                        });
+
+        lock.readLock().lock();
+        try {
+            if (!isRunning) {
+                throw new SqlGatewayException(
+                        "Failed to submit the operation because the 
OperationManager is closed.");
+            }
+            submittedOperations.put(handle, operation);
+            operation.run(service);
+        } finally {
+            lock.readLock().unlock();

Review Comment:
   It's a little hack that `map.put` is protected by a read lock. How about use 
read-write lock and a generic HashMap? 



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java:
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.commons.collections.iterators.IteratorChain;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link ResultFetcher}. */
+public class ResultFetcherTest {
+
+    private ResolvedSchema schema;
+    private List<RowData> data;
+
+    @Before
+    public void setUp() {
+        schema =
+                ResolvedSchema.of(
+                        Column.physical("boolean", DataTypes.BOOLEAN()),
+                        Column.physical("int", DataTypes.INT()),
+                        Column.physical("bigint", DataTypes.BIGINT()),
+                        Column.physical("varchar", DataTypes.STRING()),
+                        Column.physical("decimal(10, 5)", 
DataTypes.DECIMAL(10, 5)),
+                        Column.physical(
+                                "timestamp", 
DataTypes.TIMESTAMP(6).bridgedTo(Timestamp.class)),
+                        Column.physical("binary", DataTypes.BYTES()));
+        data =
+                Arrays.asList(
+                        GenericRowData.ofKind(
+                                RowKind.INSERT,
+                                null,
+                                1,
+                                2L,
+                                "abc",
+                                BigDecimal.valueOf(1.23),
+                                Timestamp.valueOf("2020-03-01 18:39:14"),
+                                new byte[] {50, 51, 52, -123, 54, 93, 115, 
126}),
+                        GenericRowData.ofKind(
+                                RowKind.UPDATE_BEFORE,
+                                false,
+                                null,
+                                0L,
+                                "",
+                                BigDecimal.valueOf(1),
+                                Timestamp.valueOf("2020-03-01 18:39:14.1"),
+                                new byte[] {100, -98, 32, 121, -125}),
+                        GenericRowData.ofKind(
+                                RowKind.UPDATE_AFTER,
+                                true,
+                                Integer.MAX_VALUE,
+                                null,
+                                "abcdefg",
+                                BigDecimal.valueOf(12345),
+                                Timestamp.valueOf("2020-03-01 18:39:14.12"),
+                                new byte[] {-110, -23, 1, 2}),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE,
+                                false,
+                                Integer.MIN_VALUE,
+                                Long.MAX_VALUE,
+                                null,
+                                BigDecimal.valueOf(12345.06789),
+                                Timestamp.valueOf("2020-03-01 18:39:14.123"),
+                                new byte[] {50, 51, 52, -123, 54, 93, 115, 
126}),
+                        GenericRowData.ofKind(
+                                RowKind.INSERT,
+                                true,
+                                100,
+                                Long.MIN_VALUE,
+                                "abcdefg111",
+                                null,
+                                Timestamp.valueOf("2020-03-01 
18:39:14.123456"),
+                                new byte[] {110, 23, -1, -2}),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE,
+                                null,
+                                -1,
+                                -1L,
+                                "abcdefghijklmnopqrstuvwxyz",
+                                BigDecimal.valueOf(-12345.06789),
+                                null,
+                                null),
+                        GenericRowData.ofKind(
+                                RowKind.INSERT,
+                                null,
+                                -1,
+                                -1L,
+                                "这是一段中文",
+                                BigDecimal.valueOf(-12345.06789),
+                                Timestamp.valueOf("2020-03-04 18:39:14"),
+                                new byte[] {-3, -2, -1, 0, 1, 2, 3}),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE,
+                                null,
+                                -1,
+                                -1L,
+                                "これは日本語をテストするための文です",
+                                BigDecimal.valueOf(-12345.06789),
+                                Timestamp.valueOf("2020-03-04 18:39:14"),
+                                new byte[] {-3, -2, -1, 0, 1, 2, 3}));
+    }
+
+    @Test
+    public void testFetchResultsMultipleTimesWithLimitedBufferSize() {
+        int bufferSize = data.size() / 2;
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
bufferSize);
+
+        runFetchMultipleTimes(fetcher, bufferSize, data.size());
+    }
+
+    @Test
+    public void testFetchResultsMultipleTimesWithLimitedFetchSize() {
+        int bufferSize = data.size();
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
bufferSize);
+
+        runFetchMultipleTimes(fetcher, bufferSize, data.size() / 2);
+    }
+
+    @Test
+    public void testFetchResultInParallel() {
+        int bufferSize = data.size() / 2;
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
bufferSize);
+
+        AtomicReference<Boolean> isEqual = new AtomicReference<>(true);
+
+        for (int i = 0; i < 10; i++) {
+            new Thread(
+                            () -> {
+                                ResultSet resultSet = fetcher.fetchResults(0, 
Integer.MAX_VALUE);
+
+                                if (!data.subList(0, 
bufferSize).equals(resultSet.getData())) {
+                                    isEqual.set(false);
+                                }
+                                fetcher.close();
+                            })
+                    .start();
+        }
+

Review Comment:
   Please wait for all the threads finished (e.g. by `Thread#join()`) before 
assertion. It's possible that not even a thread is scheduled before assertion. 



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java:
##########
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.core.testutils.FlinkAssertions;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.commons.collections.iterators.IteratorChain;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Test for {@link ResultFetcher}. */
+public class ResultFetcherTest {
+
+    private ResolvedSchema schema;
+    private List<RowData> data;
+
+    @Before
+    public void setUp() {
+        schema =
+                ResolvedSchema.of(
+                        Column.physical("boolean", DataTypes.BOOLEAN()),
+                        Column.physical("int", DataTypes.INT()),
+                        Column.physical("bigint", DataTypes.BIGINT()),
+                        Column.physical("varchar", DataTypes.STRING()),
+                        Column.physical("decimal(10, 5)", 
DataTypes.DECIMAL(10, 5)),
+                        Column.physical(
+                                "timestamp", 
DataTypes.TIMESTAMP(6).bridgedTo(Timestamp.class)),
+                        Column.physical("binary", DataTypes.BYTES()));
+        data =
+                Arrays.asList(
+                        GenericRowData.ofKind(
+                                RowKind.INSERT,
+                                null,
+                                1,
+                                2L,
+                                "abc",
+                                BigDecimal.valueOf(1.23),
+                                Timestamp.valueOf("2020-03-01 18:39:14"),
+                                new byte[] {50, 51, 52, -123, 54, 93, 115, 
126}),
+                        GenericRowData.ofKind(
+                                RowKind.UPDATE_BEFORE,
+                                false,
+                                null,
+                                0L,
+                                "",
+                                BigDecimal.valueOf(1),
+                                Timestamp.valueOf("2020-03-01 18:39:14.1"),
+                                new byte[] {100, -98, 32, 121, -125}),
+                        GenericRowData.ofKind(
+                                RowKind.UPDATE_AFTER,
+                                true,
+                                Integer.MAX_VALUE,
+                                null,
+                                "abcdefg",
+                                BigDecimal.valueOf(12345),
+                                Timestamp.valueOf("2020-03-01 18:39:14.12"),
+                                new byte[] {-110, -23, 1, 2}),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE,
+                                false,
+                                Integer.MIN_VALUE,
+                                Long.MAX_VALUE,
+                                null,
+                                BigDecimal.valueOf(12345.06789),
+                                Timestamp.valueOf("2020-03-01 18:39:14.123"),
+                                new byte[] {50, 51, 52, -123, 54, 93, 115, 
126}),
+                        GenericRowData.ofKind(
+                                RowKind.INSERT,
+                                true,
+                                100,
+                                Long.MIN_VALUE,
+                                "abcdefg111",
+                                null,
+                                Timestamp.valueOf("2020-03-01 
18:39:14.123456"),
+                                new byte[] {110, 23, -1, -2}),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE,
+                                null,
+                                -1,
+                                -1L,
+                                "abcdefghijklmnopqrstuvwxyz",
+                                BigDecimal.valueOf(-12345.06789),
+                                null,
+                                null),
+                        GenericRowData.ofKind(
+                                RowKind.INSERT,
+                                null,
+                                -1,
+                                -1L,
+                                "这是一段中文",
+                                BigDecimal.valueOf(-12345.06789),
+                                Timestamp.valueOf("2020-03-04 18:39:14"),
+                                new byte[] {-3, -2, -1, 0, 1, 2, 3}),
+                        GenericRowData.ofKind(
+                                RowKind.DELETE,
+                                null,
+                                -1,
+                                -1L,
+                                "これは日本語をテストするための文です",
+                                BigDecimal.valueOf(-12345.06789),
+                                Timestamp.valueOf("2020-03-04 18:39:14"),
+                                new byte[] {-3, -2, -1, 0, 1, 2, 3}));
+    }
+
+    @Test
+    public void testFetchResultsMultipleTimesWithLimitedBufferSize() {
+        int bufferSize = data.size() / 2;
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
bufferSize);
+
+        runFetchMultipleTimes(fetcher, bufferSize, data.size());
+    }
+
+    @Test
+    public void testFetchResultsMultipleTimesWithLimitedFetchSize() {
+        int bufferSize = data.size();
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
bufferSize);
+
+        runFetchMultipleTimes(fetcher, bufferSize, data.size() / 2);
+    }
+
+    @Test
+    public void testFetchResultInParallel() {
+        int bufferSize = data.size() / 2;
+        ResultFetcher fetcher =
+                buildResultFetcher(Collections.singletonList(data.iterator()), 
bufferSize);
+
+        AtomicReference<Boolean> isEqual = new AtomicReference<>(true);
+
+        for (int i = 0; i < 10; i++) {
+            new Thread(
+                            () -> {
+                                ResultSet resultSet = fetcher.fetchResults(0, 
Integer.MAX_VALUE);
+
+                                if (!data.subList(0, 
bufferSize).equals(resultSet.getData())) {
+                                    isEqual.set(false);
+                                }
+                                fetcher.close();

Review Comment:
   It would be better to close after all the threads are finished. Otherwise, 
it is confusing fetcher still works after being closed. 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A fetcher to fetch result from submitted statement.
+ *
+ * <p>The fetcher uses the {@link Iterator} model. It means every time fetch 
the result with the
+ * current token, the fetcher will move forward and retire the old data.
+ *
+ * <p>After closes, the fetcher will not fetch the results from the remote but 
is able to return all
+ * data in the local cache.
+ */
+public class ResultFetcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResultFetcher.class);
+
+    private final OperationHandle operationHandle;
+
+    private final ResolvedSchema resultSchema;
+    private final ResultStore resultStore;
+    private final LinkedList<RowData> bufferedResults = new LinkedList<>();
+    private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>();
+
+    private volatile long currentToken = 0;
+    private volatile boolean noMoreResults = false;
+
+    public ResultFetcher(
+            OperationHandle operationHandle,
+            ResolvedSchema resultSchema,
+            CloseableIterator<RowData> resultRows,
+            int maxBufferSize) {
+        this.operationHandle = operationHandle;
+        this.resultSchema = resultSchema;
+        this.resultStore = new ResultStore(resultRows, maxBufferSize);
+    }
+
+    public void close() {
+        resultStore.close();
+    }
+
+    public synchronized ResultSet fetchResults(long token, int maxFetchSize) {

Review Comment:
   In the long-term, we shouldn't allow concurrent fetchResult, because that's 
error-prone to fetch an incorrect token. This should be resolved by 
[FLINK-28053](https://issues.apache.org/jira/browse/FLINK-28053) in the future. 
Maybe we can add a TODO comment for this? 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A fetcher to fetch result from submitted statement.
+ *
+ * <p>The fetcher uses the {@link Iterator} model. It means every time fetch 
the result with the
+ * current token, the fetcher will move forward and retire the old data.
+ *
+ * <p>After closes, the fetcher will not fetch the results from the remote but 
is able to return all
+ * data in the local cache.
+ */
+public class ResultFetcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResultFetcher.class);
+
+    private final OperationHandle operationHandle;
+
+    private final ResolvedSchema resultSchema;
+    private final ResultStore resultStore;
+    private final LinkedList<RowData> bufferedResults = new LinkedList<>();
+    private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>();
+
+    private volatile long currentToken = 0;
+    private volatile boolean noMoreResults = false;

Review Comment:
   `currentToken` and `noMoreResults` have been protected by `synchronized` 
keyword on `fetchResults` method. So no need to declare them as `volatile`.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.OperationInfo;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.operation.Operation;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import 
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestResource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.assertj.core.api.Assertions;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static 
org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** ITCase for {@link SqlGatewayServiceImpl}. */
+public class SqlGatewayServiceITCase extends AbstractTestBase {
+
+    @ClassRule
+    public static final SqlGatewayServiceTestResource 
SQL_GATEWAY_SERVICE_RESOURCE =
+            new SqlGatewayServiceTestResource();
+
+    private static SessionManager sessionManager;
+    private static SqlGatewayServiceImpl service;
+
+    private final SessionEnvironment defaultSessionEnvironment =
+            SessionEnvironment.newBuilder()
+                    .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                    .build();
+
+    @BeforeClass
+    public static void setup() {
+        sessionManager = SQL_GATEWAY_SERVICE_RESOURCE.getSessionManager();
+        service = (SqlGatewayServiceImpl) 
SQL_GATEWAY_SERVICE_RESOURCE.getService();
+    }
+
+    @Test
+    public void testOpenSessionWithConfig() {
+        Map<String, String> options = new HashMap<>();
+        options.put("key1", "val1");
+        options.put("key2", "val2");
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .addSessionConfig(options)
+                        .build();
+
+        SessionHandle sessionHandle = service.openSession(environment);
+        Map<String, String> actualConfig = 
service.getSessionConfig(sessionHandle);
+
+        options.forEach(
+                (k, v) ->
+                        assertThat(
+                                String.format(
+                                        "Should contains (%s, %s) in the 
actual config.", k, v),
+                                actualConfig,
+                                Matchers.hasEntry(k, v)));
+    }
+
+    @Test
+    public void testFetchResultsInRunning() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+        CountDownLatch endRunningLatch = new CountDownLatch(1);
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            endRunningLatch.await();
+                        });
+
+        startRunningLatch.await();
+        assertEquals(
+                ResultSet.NOT_READY_RESULTS,
+                service.fetchResults(sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE));
+    }
+
+    @Test
+    public void testGetOperationFinishedAndFetchResults() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+        CountDownLatch endRunningLatch = new CountDownLatch(1);
+
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            endRunningLatch.await();
+                        });
+
+        startRunningLatch.await();
+        assertEquals(
+                new OperationInfo(OperationStatus.RUNNING, 
OperationType.UNKNOWN, true),
+                service.getOperationInfo(sessionHandle, operationHandle));
+
+        endRunningLatch.countDown();
+        OperationInfo expectedInfo =
+                new OperationInfo(OperationStatus.FINISHED, 
OperationType.UNKNOWN, true);
+
+        CommonTestUtils.waitUtil(
+                () -> service.getOperationInfo(sessionHandle, 
operationHandle).equals(expectedInfo),
+                Duration.ofSeconds(10),
+                "Failed to wait operation finish.");
+
+        Long token = 0L;
+        List<RowData> expectedData = getDefaultResultSet().getData();
+        List<RowData> actualData = new ArrayList<>();
+        while (token != null) {
+            ResultSet currentResult =
+                    service.fetchResults(sessionHandle, operationHandle, 
token, 1);
+            actualData.addAll(checkNotNull(currentResult.getData()));
+            token = currentResult.getNextToken();
+        }
+        assertEquals(expectedData, actualData);
+
+        service.closeOperation(sessionHandle, operationHandle);
+        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+    }
+
+    @Test
+    public void testCancelOperation() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+        CountDownLatch endRunningLatch = new CountDownLatch(1);
+
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            endRunningLatch.await();
+                        });
+
+        startRunningLatch.await();
+        assertEquals(
+                new OperationInfo(OperationStatus.RUNNING, 
OperationType.UNKNOWN, true),
+                service.getOperationInfo(sessionHandle, operationHandle));
+
+        service.cancelOperation(sessionHandle, operationHandle);
+
+        assertEquals(
+                new OperationInfo(OperationStatus.CANCELED, 
OperationType.UNKNOWN, true),
+                service.getOperationInfo(sessionHandle, operationHandle));
+        service.closeOperation(sessionHandle, operationHandle);
+        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+    }
+
+    @Test
+    public void testOperationGetErrorAndFetchError() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+
+        String msg = "Artificial Exception.";
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            throw new SqlExecutionException(msg);
+                        });
+        startRunningLatch.await();
+
+        CommonTestUtils.waitUtil(
+                () ->
+                        service.getOperationInfo(sessionHandle, 
operationHandle)
+                                .getStatus()
+                                .equals(OperationStatus.ERROR),
+                Duration.ofSeconds(10),
+                "Failed to get expected operation status.");
+
+        Assertions.assertThatThrownBy(
+                        () ->
+                                service.fetchResults(
+                                        sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE))
+                .satisfies(anyCauseMatches(SqlExecutionException.class, msg));
+
+        service.closeOperation(sessionHandle, operationHandle);
+        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Concurrent tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testCancelOperationAndFetchResultInParallel() throws Exception 
{
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            // allow cancel before execution finish.
+                            Thread.sleep(500);
+                        });
+        runCancelOrCloseOperationWhenFetchResults(
+                sessionHandle,
+                operationHandle,
+                () -> service.cancelOperation(sessionHandle, operationHandle),
+                String.format(
+                        "Can not fetch results from the %s in %s status.",
+                        operationHandle, OperationStatus.CANCELED));
+    }
+
+    @Test
+    public void testCloseOperationAndFetchResultInParallel() {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            // allow cancel before execution finish.
+                            Thread.sleep(500);
+                        });
+        runCancelOrCloseOperationWhenFetchResults(
+                sessionHandle,
+                operationHandle,
+                () -> service.closeOperation(sessionHandle, operationHandle),
+                String.format(
+                        "Can not find the submitted operation in the 
OperationManager with the %s.",
+                        operationHandle));
+    }
+
+    @Test
+    public void testCancelAndCloseOperationInParallel() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        int operationNum = 200;
+        List<Operation> operations = new ArrayList<>(operationNum);
+        for (int i = 0; i < operationNum; i++) {
+            boolean throwError = i % 2 == 0;
+            OperationHandle operationHandle =
+                    submitDefaultOperation(
+                            sessionHandle,
+                            () -> {
+                                // allow cancel/close before execution finish.
+                                Thread.sleep(100);
+                                if (throwError) {
+                                    throw new SqlGatewayException("Artificial 
Exception.");
+                                }
+                            });
+
+            operations.add(
+                    service.getSession(sessionHandle)
+                            .getOperationManager()
+                            .getOperation(operationHandle));
+            new Thread(() -> service.cancelOperation(sessionHandle, 
operationHandle)).start();
+            new Thread(() -> service.closeOperation(sessionHandle, 
operationHandle)).start();
+        }
+
+        CommonTestUtils.waitUtil(
+                () ->
+                        
service.getSession(sessionHandle).getOperationManager().getOperationCount()
+                                == 0,
+                Duration.ofSeconds(10),
+                "All operation should be closed.");
+
+        for (Operation op : operations) {
+            assertEquals(OperationStatus.CLOSED, 
op.getOperationInfo().getStatus());
+        }
+    }
+
+    @Test
+    public void submitOperationAndCloseOperationManagerInParallel() throws 
Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        OperationManager manager = 
service.getSession(sessionHandle).getOperationManager();
+        int submitThreadsNum = 100;
+        CountDownLatch latch = new CountDownLatch(submitThreadsNum);
+        for (int i = 0; i < submitThreadsNum; i++) {
+            new Thread(
+                            () -> {
+                                try {
+                                    submitDefaultOperation(sessionHandle, () 
-> {});
+                                } finally {
+                                    latch.countDown();
+                                }
+                            })
+                    .start();
+        }
+        manager.close();
+        latch.await();
+        assertEquals(0, manager.getOperationCount());
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Negative tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testFetchResultsFromCanceledOperation() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        OperationHandle operationHandle = 
submitDefaultOperation(sessionHandle, () -> {});
+        service.cancelOperation(sessionHandle, operationHandle);
+        assertThatThrownBy(
+                        () ->
+                                service.fetchResults(
+                                        sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE))
+                .satisfies(
+                        anyCauseMatches(
+                                String.format(
+                                        "Can not fetch results from the %s in 
%s status.",
+                                        operationHandle, 
OperationStatus.CANCELED)));
+    }
+
+    @Test
+    public void testRequestNonExistOperation() {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        OperationHandle operationHandle = OperationHandle.create();
+        List<RunnableWithException> requests =
+                Arrays.asList(
+                        () -> service.cancelOperation(sessionHandle, 
operationHandle),
+                        () -> service.getOperationInfo(sessionHandle, 
operationHandle),
+                        () ->
+                                service.fetchResults(
+                                        sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE));
+
+        for (RunnableWithException request : requests) {
+            assertThatThrownBy(request::run)
+                    .satisfies(
+                            anyCauseMatches(
+                                    String.format(
+                                            "Can not find the submitted 
operation in the OperationManager with the %s.",
+                                            operationHandle)));
+        }
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+
+    private OperationHandle submitDefaultOperation(
+            SessionHandle sessionHandle, RunnableWithException executor) {
+        return service.submitOperation(
+                sessionHandle,
+                OperationType.UNKNOWN,
+                () -> {
+                    executor.run();
+                    return getDefaultResultSet();
+                });
+    }
+
+    private ResultSet getDefaultResultSet() {
+        List<RowData> data =
+                Arrays.asList(
+                        GenericRowData.ofKind(INSERT, 1L, 
StringData.fromString("Flink CDC"), 3),
+                        GenericRowData.ofKind(INSERT, 2L, 
StringData.fromString("MySql"), null),
+                        GenericRowData.ofKind(DELETE, 1, null, null),
+                        GenericRowData.ofKind(UPDATE_AFTER, 2, null, 101));
+        return new ResultSet(
+                ResultSet.ResultType.PAYLOAD,
+                null,
+                ResolvedSchema.of(
+                        Column.physical("id", DataTypes.BIGINT()),
+                        Column.physical("name", DataTypes.STRING()),
+                        Column.physical("age", DataTypes.INT())),
+                data);
+    }
+
+    private void runCancelOrCloseOperationWhenFetchResults(
+            SessionHandle sessionHandle,
+            OperationHandle operationHandle,
+            RunnableWithException cancelOrClose,
+            String errorMsg) {
+
+        List<RowData> actual = new ArrayList<>();
+        new Thread(
+                        () -> {
+                            try {
+                                Thread.sleep(500);
+                                cancelOrClose.run();
+                            } catch (Exception e) {
+                                // ignore
+                            }
+                        })
+                .start();
+        try {
+            Long token = 0L;
+            while (token != null) {
+                ResultSet resultSet =
+                        service.fetchResults(
+                                sessionHandle, operationHandle, token, 
Integer.MAX_VALUE);
+                token = resultSet.getNextToken();
+                if (resultSet.getResultType() == ResultSet.ResultType.PAYLOAD) 
{
+                    actual.addAll(resultSet.getData());
+                }
+            }

Review Comment:
   Should fail here? Is it allowed to not throw exception?



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.operation.OperationStatus;
+import org.apache.flink.table.gateway.api.operation.OperationType;
+import org.apache.flink.table.gateway.api.results.OperationInfo;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.operation.Operation;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import 
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestResource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.assertj.core.api.Assertions;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static 
org.apache.flink.core.testutils.FlinkAssertions.assertThatChainOfCauses;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** ITCase for {@link SqlGatewayServiceImpl}. */
+public class SqlGatewayServiceITCase extends AbstractTestBase {
+
+    @ClassRule
+    public static final SqlGatewayServiceTestResource 
SQL_GATEWAY_SERVICE_RESOURCE =
+            new SqlGatewayServiceTestResource();
+
+    private static SessionManager sessionManager;
+    private static SqlGatewayServiceImpl service;
+
+    private final SessionEnvironment defaultSessionEnvironment =
+            SessionEnvironment.newBuilder()
+                    .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                    .build();
+
+    @BeforeClass
+    public static void setup() {
+        sessionManager = SQL_GATEWAY_SERVICE_RESOURCE.getSessionManager();
+        service = (SqlGatewayServiceImpl) 
SQL_GATEWAY_SERVICE_RESOURCE.getService();
+    }
+
+    @Test
+    public void testOpenSessionWithConfig() {
+        Map<String, String> options = new HashMap<>();
+        options.put("key1", "val1");
+        options.put("key2", "val2");
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .addSessionConfig(options)
+                        .build();
+
+        SessionHandle sessionHandle = service.openSession(environment);
+        Map<String, String> actualConfig = 
service.getSessionConfig(sessionHandle);
+
+        options.forEach(
+                (k, v) ->
+                        assertThat(
+                                String.format(
+                                        "Should contains (%s, %s) in the 
actual config.", k, v),
+                                actualConfig,
+                                Matchers.hasEntry(k, v)));
+    }
+
+    @Test
+    public void testFetchResultsInRunning() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+        CountDownLatch endRunningLatch = new CountDownLatch(1);
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            endRunningLatch.await();
+                        });
+
+        startRunningLatch.await();
+        assertEquals(
+                ResultSet.NOT_READY_RESULTS,
+                service.fetchResults(sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE));
+    }
+
+    @Test
+    public void testGetOperationFinishedAndFetchResults() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+        CountDownLatch endRunningLatch = new CountDownLatch(1);
+
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            endRunningLatch.await();
+                        });
+
+        startRunningLatch.await();
+        assertEquals(
+                new OperationInfo(OperationStatus.RUNNING, 
OperationType.UNKNOWN, true),
+                service.getOperationInfo(sessionHandle, operationHandle));
+
+        endRunningLatch.countDown();
+        OperationInfo expectedInfo =
+                new OperationInfo(OperationStatus.FINISHED, 
OperationType.UNKNOWN, true);
+
+        CommonTestUtils.waitUtil(
+                () -> service.getOperationInfo(sessionHandle, 
operationHandle).equals(expectedInfo),
+                Duration.ofSeconds(10),
+                "Failed to wait operation finish.");
+
+        Long token = 0L;
+        List<RowData> expectedData = getDefaultResultSet().getData();
+        List<RowData> actualData = new ArrayList<>();
+        while (token != null) {
+            ResultSet currentResult =
+                    service.fetchResults(sessionHandle, operationHandle, 
token, 1);
+            actualData.addAll(checkNotNull(currentResult.getData()));
+            token = currentResult.getNextToken();
+        }
+        assertEquals(expectedData, actualData);
+
+        service.closeOperation(sessionHandle, operationHandle);
+        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+    }
+
+    @Test
+    public void testCancelOperation() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+        CountDownLatch endRunningLatch = new CountDownLatch(1);
+
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            endRunningLatch.await();
+                        });
+
+        startRunningLatch.await();
+        assertEquals(
+                new OperationInfo(OperationStatus.RUNNING, 
OperationType.UNKNOWN, true),
+                service.getOperationInfo(sessionHandle, operationHandle));
+
+        service.cancelOperation(sessionHandle, operationHandle);
+
+        assertEquals(
+                new OperationInfo(OperationStatus.CANCELED, 
OperationType.UNKNOWN, true),
+                service.getOperationInfo(sessionHandle, operationHandle));
+        service.closeOperation(sessionHandle, operationHandle);
+        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+    }
+
+    @Test
+    public void testOperationGetErrorAndFetchError() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        CountDownLatch startRunningLatch = new CountDownLatch(1);
+
+        String msg = "Artificial Exception.";
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            startRunningLatch.countDown();
+                            throw new SqlExecutionException(msg);
+                        });
+        startRunningLatch.await();
+
+        CommonTestUtils.waitUtil(
+                () ->
+                        service.getOperationInfo(sessionHandle, 
operationHandle)
+                                .getStatus()
+                                .equals(OperationStatus.ERROR),
+                Duration.ofSeconds(10),
+                "Failed to get expected operation status.");
+
+        Assertions.assertThatThrownBy(
+                        () ->
+                                service.fetchResults(
+                                        sessionHandle, operationHandle, 0, 
Integer.MAX_VALUE))
+                .satisfies(anyCauseMatches(SqlExecutionException.class, msg));
+
+        service.closeOperation(sessionHandle, operationHandle);
+        assertEquals(0, sessionManager.getOperationCount(sessionHandle));
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Concurrent tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testCancelOperationAndFetchResultInParallel() throws Exception 
{
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            // allow cancel before execution finish.
+                            Thread.sleep(500);
+                        });
+        runCancelOrCloseOperationWhenFetchResults(
+                sessionHandle,
+                operationHandle,
+                () -> service.cancelOperation(sessionHandle, operationHandle),
+                String.format(
+                        "Can not fetch results from the %s in %s status.",
+                        operationHandle, OperationStatus.CANCELED));
+    }
+
+    @Test
+    public void testCloseOperationAndFetchResultInParallel() {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        OperationHandle operationHandle =
+                submitDefaultOperation(
+                        sessionHandle,
+                        () -> {
+                            // allow cancel before execution finish.
+                            Thread.sleep(500);
+                        });
+        runCancelOrCloseOperationWhenFetchResults(
+                sessionHandle,
+                operationHandle,
+                () -> service.closeOperation(sessionHandle, operationHandle),
+                String.format(
+                        "Can not find the submitted operation in the 
OperationManager with the %s.",
+                        operationHandle));
+    }
+
+    @Test
+    public void testCancelAndCloseOperationInParallel() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        int operationNum = 200;
+        List<Operation> operations = new ArrayList<>(operationNum);
+        for (int i = 0; i < operationNum; i++) {
+            boolean throwError = i % 2 == 0;
+            OperationHandle operationHandle =
+                    submitDefaultOperation(
+                            sessionHandle,
+                            () -> {
+                                // allow cancel/close before execution finish.
+                                Thread.sleep(100);
+                                if (throwError) {
+                                    throw new SqlGatewayException("Artificial 
Exception.");
+                                }
+                            });
+
+            operations.add(
+                    service.getSession(sessionHandle)
+                            .getOperationManager()
+                            .getOperation(operationHandle));
+            new Thread(() -> service.cancelOperation(sessionHandle, 
operationHandle)).start();
+            new Thread(() -> service.closeOperation(sessionHandle, 
operationHandle)).start();
+        }
+
+        CommonTestUtils.waitUtil(
+                () ->
+                        
service.getSession(sessionHandle).getOperationManager().getOperationCount()
+                                == 0,
+                Duration.ofSeconds(10),
+                "All operation should be closed.");
+
+        for (Operation op : operations) {
+            assertEquals(OperationStatus.CLOSED, 
op.getOperationInfo().getStatus());
+        }
+    }
+
+    @Test
+    public void submitOperationAndCloseOperationManagerInParallel() throws 
Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        OperationManager manager = 
service.getSession(sessionHandle).getOperationManager();
+        int submitThreadsNum = 100;
+        CountDownLatch latch = new CountDownLatch(submitThreadsNum);
+        for (int i = 0; i < submitThreadsNum; i++) {
+            new Thread(
+                            () -> {
+                                try {
+                                    submitDefaultOperation(sessionHandle, () 
-> {});
+                                } finally {
+                                    latch.countDown();
+                                }
+                            })
+                    .start();
+        }
+        manager.close();
+        latch.await();
+        assertEquals(0, manager.getOperationCount());
+    }
+
+    // 
--------------------------------------------------------------------------------------------
+    // Negative tests
+    // 
--------------------------------------------------------------------------------------------
+
+    @Test
+    public void testFetchResultsFromCanceledOperation() throws Exception {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+
+        OperationHandle operationHandle = 
submitDefaultOperation(sessionHandle, () -> {});
+        service.cancelOperation(sessionHandle, operationHandle);

Review Comment:
   The operation status might be in FINISHED status before canceling it. The 
problem is that if the operation has been in FINISHED status, can it be 
transited from FINISHED to CANCELED? From the FLIP description, it is not 
allowed. 



##########
flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationStatus.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.api.operation;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Status to describe the {@code Operation}. */
+@PublicEvolving
+public enum OperationStatus {
+    /** The operation is newly created. */
+    INITIALIZED(false),
+
+    /** Prepare the resources for the operation. */
+    PENDING(false),
+
+    /** The operation is running. */
+    RUNNING(false),
+
+    /** All the work is finished and ready for the client to fetch the 
results. */
+    FINISHED(true),
+
+    /** Operation has been cancelled. */
+    CANCELED(true),
+
+    /** Operation has been closed and all related resources are collected. */
+    CLOSED(true),
+
+    /** Some error happens. */
+    ERROR(true),
+
+    /** The execution of the operation timeout. */
+    TIMEOUT(true);
+
+    private final boolean isTerminalStatus;
+
+    OperationStatus(boolean isTerminalStatus) {
+        this.isTerminalStatus = isTerminalStatus;
+    }
+
+    public static boolean isValidStatusTranslation(

Review Comment:
   isValidStatusTranslation -> isValidStatusTransition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to