wuchong commented on code in PR #19823:
URL: https://github.com/apache/flink/pull/19823#discussion_r897625718


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A fetcher to fetch result from submitted statement.
+ *
+ * <p>The fetcher uses the {@link Iterator} model. It means every time fetch 
the result with the
+ * current token, the fetcher will move forward and retire the old data.
+ */
+public class ResultFetcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResultFetcher.class);
+
+    public static final int FETCH_ALL = Integer.MAX_VALUE;
+
+    private final OperationHandle operationHandle;
+
+    private final ResolvedSchema resultSchema;
+    private final ResultStore resultStore;
+    private final LinkedList<RowData> bufferedResults = new LinkedList<>();
+    private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>();
+
+    private long currentToken = 0;
+    private int previousMaxFetchSize = 0;
+    private boolean noMoreResults = false;
+
+    public ResultFetcher(
+            OperationHandle operationHandle,
+            ResolvedSchema resultSchema,
+            CloseableIterator<RowData> resultRows,
+            int maxBufferSize) {
+        this.operationHandle = operationHandle;
+        this.resultSchema = resultSchema;
+        this.resultStore = new ResultStore(resultRows, maxBufferSize);
+    }
+
+    public void close() {
+        resultStore.close();
+    }
+
+    public ResultSet fetchResults(long token, int maxFetchSize) {
+        if (maxFetchSize <= 0) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The max rows should be larger than 0 or use %s to 
fetch all available data.",

Review Comment:
   I think `FETCH_ALL` doesn't make sense here, because you just fetch Int.MAX 
rows in case of `FETCH_ALL`, not all the rows in the iterator. Why not remove 
the FETCH_ALL shortcut? 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A fetcher to fetch result from submitted statement.
+ *
+ * <p>The fetcher uses the {@link Iterator} model. It means every time fetch 
the result with the
+ * current token, the fetcher will move forward and retire the old data.
+ */
+public class ResultFetcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResultFetcher.class);
+
+    public static final int FETCH_ALL = Integer.MAX_VALUE;
+
+    private final OperationHandle operationHandle;
+
+    private final ResolvedSchema resultSchema;
+    private final ResultStore resultStore;
+    private final LinkedList<RowData> bufferedResults = new LinkedList<>();
+    private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>();
+
+    private long currentToken = 0;
+    private int previousMaxFetchSize = 0;
+    private boolean noMoreResults = false;
+
+    public ResultFetcher(
+            OperationHandle operationHandle,
+            ResolvedSchema resultSchema,
+            CloseableIterator<RowData> resultRows,
+            int maxBufferSize) {
+        this.operationHandle = operationHandle;
+        this.resultSchema = resultSchema;
+        this.resultStore = new ResultStore(resultRows, maxBufferSize);
+    }
+
+    public void close() {
+        resultStore.close();
+    }
+
+    public ResultSet fetchResults(long token, int maxFetchSize) {
+        if (maxFetchSize <= 0) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The max rows should be larger than 0 or use %s to 
fetch all available data.",
+                            FETCH_ALL));
+        }
+
+        if (token == currentToken) {
+            // equal to the Iterator.next()
+            if (noMoreResults) {
+                LOG.debug("There is no more result for operation: {}.", 
operationHandle);
+                return new ResultSet(
+                        ResultSet.ResultType.EOS, null, resultSchema, 
Collections.emptyList());
+            }
+
+            // a new token arrives, move the current buffer data into the prev 
buffered results.
+            bufferedPrevResults.clear();
+            if (bufferedResults.isEmpty()) {
+                // buffered results have been totally consumed,
+                // so try to fetch new results
+                Optional<List<RowData>> newResults = 
resultStore.retrieveRecords();
+                if (newResults.isPresent()) {
+                    bufferedResults.addAll(newResults.get());
+                } else {
+                    noMoreResults = true;
+                    return new ResultSet(
+                            ResultSet.ResultType.EOS, null, resultSchema, 
Collections.emptyList());
+                }
+            }
+
+            previousMaxFetchSize = maxFetchSize;
+            int resultSize;
+            if (maxFetchSize != FETCH_ALL) {
+                resultSize = Math.min(bufferedResults.size(), maxFetchSize);
+            } else {
+                resultSize = bufferedResults.size();
+            }
+
+            LOG.debug(
+                    "Fetching current result for operation: {}, token: {}, 
maxFetchSize: {}, realReturnSize: {}.",
+                    operationHandle,
+                    token,
+                    maxFetchSize,
+                    resultSize);
+
+            // move forward
+            currentToken++;
+            // move result to buffer
+            for (int i = 0; i < resultSize; i++) {
+                bufferedPrevResults.add(bufferedResults.removeFirst());
+            }
+            return new ResultSet(
+                    ResultSet.ResultType.PAYLOAD, currentToken, resultSchema, 
bufferedPrevResults);
+        } else if (token == currentToken - 1 && token >= 0) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Fetching previous result for operation: {}, token: 
{}, maxFetchSize: {}",
+                        operationHandle,
+                        token,
+                        maxFetchSize);
+            }
+            if (previousMaxFetchSize != maxFetchSize) {
+                String msg =
+                        String.format(
+                                "As the same token is provided, fetch size 
must be the same. Expecting max_fetch_size to be %s, current max_fetch_size to 
be %s.",
+                                previousMaxFetchSize, maxFetchSize);

Review Comment:
   IMO, it would be fine as long as the `maxFetchSize >= previousResultSize`, 
where `previousResultSize` is the real size of the previous fetch result. 
`maxFetchSize` is a protection to avoid result set larger than expected. 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationStatus;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+/** Entity that contains the resource to run and the execution results. */
+public class Operation {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Operation.class);
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private final OperationHandle operationHandle;
+
+    private final OperationType operationType;
+    private final boolean hasResults;
+    private OperationStatus status;
+
+    private final Callable<ResultFetcher> resultSupplier;
+
+    private Future<?> invocation;
+    private ResultFetcher resultFetcher;
+    private SqlExecutionException operationError;
+
+    public Operation(
+            OperationHandle operationHandle,
+            OperationType operationType,
+            Callable<ResultFetcher> resultSupplier) {
+        this.operationHandle = operationHandle;
+        this.status = OperationStatus.INITIALIZED;
+        this.operationType = operationType;
+        this.hasResults = true;
+        this.resultSupplier = resultSupplier;
+    }
+
+    void runBefore() {
+        updateState(OperationStatus.PENDING);
+    }
+
+    void runAfter() {
+        updateState(OperationStatus.FINISHED);
+    }
+
+    public void run(ExecutorService service) {
+        invocation =
+                service.submit(
+                        () -> {
+                            try {
+                                runBefore();
+                                updateState(OperationStatus.RUNNING);
+                                ResultFetcher fetcher = resultSupplier.call();
+                                writeLock(() -> resultFetcher = fetcher);
+                                runAfter();
+                            } catch (Exception e) {
+                                String msg =
+                                        String.format(
+                                                "Failed to execute the 
operation %s.",
+                                                operationHandle);
+                                LOG.error(msg, e);
+                                writeLock(
+                                        () -> {
+                                            updateState(OperationStatus.ERROR);
+                                            operationError = new 
SqlExecutionException(msg, e);
+                                        });
+                            }
+                        });
+    }
+
+    public void cancel() {
+        writeLock(
+                () -> {
+                    if (invocation != null && !invocation.isDone()) {
+                        invocation.cancel(true);
+                    }
+
+                    if (resultFetcher != null) {
+                        resultFetcher.close();
+                        resultFetcher = null;
+                    }
+
+                    updateState(OperationStatus.CANCELED);
+                });
+    }
+
+    public void close() {
+        writeLock(
+                () -> {
+                    if (invocation != null && !invocation.isDone()) {
+                        invocation.cancel(true);
+                    }
+
+                    if (resultFetcher != null) {
+                        resultFetcher.close();
+                        resultFetcher = null;
+                    }
+
+                    updateState(OperationStatus.CLOSED);
+                });
+    }
+
+    public ResultSet fetchResults(long token, int maxRows) {
+        OperationStatus currentStatus = readLock(() -> status);
+
+        if (currentStatus == OperationStatus.ERROR) {
+            throw operationError;
+        } else if (currentStatus == OperationStatus.FINISHED) {
+            return resultFetcher.fetchResults(token, maxRows);
+        } else {

Review Comment:
   Why throw exceptions for all the other cases? I think it's very possible to 
call fetchResult before the operation is scheduled.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationStatus;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+/** Entity that contains the resource to run and the execution results. */
+public class Operation {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Operation.class);
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private final OperationHandle operationHandle;
+
+    private final OperationType operationType;
+    private final boolean hasResults;
+    private OperationStatus status;
+
+    private final Callable<ResultFetcher> resultSupplier;
+
+    private Future<?> invocation;
+    private ResultFetcher resultFetcher;
+    private SqlExecutionException operationError;
+
+    public Operation(
+            OperationHandle operationHandle,
+            OperationType operationType,
+            Callable<ResultFetcher> resultSupplier) {
+        this.operationHandle = operationHandle;
+        this.status = OperationStatus.INITIALIZED;
+        this.operationType = operationType;
+        this.hasResults = true;
+        this.resultSupplier = resultSupplier;
+    }
+
+    void runBefore() {
+        updateState(OperationStatus.PENDING);
+    }
+
+    void runAfter() {
+        updateState(OperationStatus.FINISHED);
+    }
+
+    public void run(ExecutorService service) {
+        invocation =
+                service.submit(
+                        () -> {
+                            try {
+                                runBefore();
+                                updateState(OperationStatus.RUNNING);
+                                ResultFetcher fetcher = resultSupplier.call();
+                                writeLock(() -> resultFetcher = fetcher);

Review Comment:
   I'm thinking that can we use `AtomicReference` for `status` which may avoid 
so many locks. 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationStatus;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+/** Entity that contains the resource to run and the execution results. */
+public class Operation {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Operation.class);
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private final OperationHandle operationHandle;
+
+    private final OperationType operationType;
+    private final boolean hasResults;
+    private OperationStatus status;
+
+    private final Callable<ResultFetcher> resultSupplier;
+
+    private Future<?> invocation;
+    private ResultFetcher resultFetcher;
+    private SqlExecutionException operationError;
+
+    public Operation(
+            OperationHandle operationHandle,
+            OperationType operationType,
+            Callable<ResultFetcher> resultSupplier) {
+        this.operationHandle = operationHandle;
+        this.status = OperationStatus.INITIALIZED;
+        this.operationType = operationType;
+        this.hasResults = true;
+        this.resultSupplier = resultSupplier;
+    }
+
+    void runBefore() {
+        updateState(OperationStatus.PENDING);
+    }
+
+    void runAfter() {
+        updateState(OperationStatus.FINISHED);
+    }
+
+    public void run(ExecutorService service) {
+        invocation =
+                service.submit(
+                        () -> {
+                            try {
+                                runBefore();

Review Comment:
   The operation should switch to pending before schduled. 



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationStatus;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+/** Entity that contains the resource to run and the execution results. */
+public class Operation {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Operation.class);
+
+    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private final OperationHandle operationHandle;
+
+    private final OperationType operationType;
+    private final boolean hasResults;
+    private OperationStatus status;
+
+    private final Callable<ResultFetcher> resultSupplier;
+
+    private Future<?> invocation;
+    private ResultFetcher resultFetcher;
+    private SqlExecutionException operationError;
+
+    public Operation(
+            OperationHandle operationHandle,
+            OperationType operationType,
+            Callable<ResultFetcher> resultSupplier) {
+        this.operationHandle = operationHandle;
+        this.status = OperationStatus.INITIALIZED;
+        this.operationType = operationType;
+        this.hasResults = true;
+        this.resultSupplier = resultSupplier;
+    }
+
+    void runBefore() {
+        updateState(OperationStatus.PENDING);
+    }
+
+    void runAfter() {
+        updateState(OperationStatus.FINISHED);
+    }
+
+    public void run(ExecutorService service) {
+        invocation =
+                service.submit(
+                        () -> {
+                            try {
+                                runBefore();
+                                updateState(OperationStatus.RUNNING);
+                                ResultFetcher fetcher = resultSupplier.call();
+                                writeLock(() -> resultFetcher = fetcher);
+                                runAfter();
+                            } catch (Exception e) {
+                                String msg =
+                                        String.format(
+                                                "Failed to execute the 
operation %s.",
+                                                operationHandle);
+                                LOG.error(msg, e);
+                                writeLock(
+                                        () -> {
+                                            updateState(OperationStatus.ERROR);
+                                            operationError = new 
SqlExecutionException(msg, e);
+                                        });
+                            }
+                        });
+    }
+
+    public void cancel() {
+        writeLock(
+                () -> {
+                    if (invocation != null && !invocation.isDone()) {
+                        invocation.cancel(true);
+                    }
+
+                    if (resultFetcher != null) {
+                        resultFetcher.close();
+                        resultFetcher = null;
+                    }
+
+                    updateState(OperationStatus.CANCELED);
+                });
+    }
+
+    public void close() {
+        writeLock(
+                () -> {
+                    if (invocation != null && !invocation.isDone()) {
+                        invocation.cancel(true);
+                    }
+
+                    if (resultFetcher != null) {
+                        resultFetcher.close();
+                        resultFetcher = null;
+                    }
+
+                    updateState(OperationStatus.CLOSED);
+                });
+    }
+
+    public ResultSet fetchResults(long token, int maxRows) {
+        OperationStatus currentStatus = readLock(() -> status);
+
+        if (currentStatus == OperationStatus.ERROR) {
+            throw operationError;
+        } else if (currentStatus == OperationStatus.FINISHED) {
+            return resultFetcher.fetchResults(token, maxRows);

Review Comment:
   The `resultFetcher` may be under closing when calling `fetchResult`. We 
should add a `volatile boolean closed` flag in ResultFetcher to avoid race 
conditions. 



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.gateway.common.SqlGatewayService;
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationStatus;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.session.SessionEnvironment;
+import org.apache.flink.table.gateway.common.session.SessionHandle;
+import org.apache.flink.table.gateway.common.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import 
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestResource;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.util.function.FunctionWithException;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.assertj.core.api.Assertions;
+import org.hamcrest.Matchers;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches;
+import static org.apache.flink.types.RowKind.DELETE;
+import static org.apache.flink.types.RowKind.INSERT;
+import static org.apache.flink.types.RowKind.UPDATE_AFTER;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** ITCase for {@link SqlGatewayServiceImpl}. */
+public class SqlGatewayServiceITCase extends AbstractTestBase {
+
+    @ClassRule
+    public static final SqlGatewayServiceTestResource 
SQL_GATEWAY_SERVICE_RESOURCE =
+            new SqlGatewayServiceTestResource();
+
+    private static SessionManager sessionManager;
+    private static SqlGatewayService service;
+
+    private final SessionEnvironment defaultSessionEnvironment =
+            SessionEnvironment.newBuilder()
+                    .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                    .build();
+
+    @BeforeClass
+    public static void setup() {
+        sessionManager = SQL_GATEWAY_SERVICE_RESOURCE.getSessionManager();
+        service = SQL_GATEWAY_SERVICE_RESOURCE.getService();
+    }
+
+    @Test
+    public void testOpenSessionWithConfig() {
+        Map<String, String> options = new HashMap<>();
+        options.put("key1", "val1");
+        options.put("key2", "val2");
+        SessionEnvironment environment =
+                SessionEnvironment.newBuilder()
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .addSessionConfig(options)
+                        .build();
+
+        SessionHandle sessionHandle = service.openSession(environment);
+        Map<String, String> actualConfig = 
service.getSessionConfig(sessionHandle);
+
+        assertThat(
+                actualConfig,
+                allOf(
+                        options.entrySet().stream()
+                                .map(entry -> 
Matchers.hasEntry(entry.getKey(), entry.getValue()))
+                                .collect(Collectors.toList())));
+    }

Review Comment:
   I was thinking we can simply assert by `assertEquals(actualConfig, 
options)`. However, it seems there are more entries in `actualConfig`. If that 
is the case, we can use `IsMapContaining`.
   
   ```java
   options.forEach((k, v) -> assertThat(actualConfig, 
IsMapContaining.hasEntry(k, v)));
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java:
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.result;
+
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.util.CloseableIterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * A fetcher to fetch result from submitted statement.
+ *
+ * <p>The fetcher uses the {@link Iterator} model. It means every time fetch 
the result with the
+ * current token, the fetcher will move forward and retire the old data.
+ */
+public class ResultFetcher {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ResultFetcher.class);
+
+    public static final int FETCH_ALL = Integer.MAX_VALUE;
+
+    private final OperationHandle operationHandle;
+
+    private final ResolvedSchema resultSchema;
+    private final ResultStore resultStore;
+    private final LinkedList<RowData> bufferedResults = new LinkedList<>();
+    private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>();
+
+    private long currentToken = 0;
+    private int previousMaxFetchSize = 0;
+    private boolean noMoreResults = false;
+
+    public ResultFetcher(
+            OperationHandle operationHandle,
+            ResolvedSchema resultSchema,
+            CloseableIterator<RowData> resultRows,
+            int maxBufferSize) {
+        this.operationHandle = operationHandle;
+        this.resultSchema = resultSchema;
+        this.resultStore = new ResultStore(resultRows, maxBufferSize);
+    }
+
+    public void close() {
+        resultStore.close();
+    }
+
+    public ResultSet fetchResults(long token, int maxFetchSize) {
+        if (maxFetchSize <= 0) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "The max rows should be larger than 0 or use %s to 
fetch all available data.",
+                            FETCH_ALL));
+        }
+
+        if (token == currentToken) {
+            // equal to the Iterator.next()
+            if (noMoreResults) {
+                LOG.debug("There is no more result for operation: {}.", 
operationHandle);
+                return new ResultSet(
+                        ResultSet.ResultType.EOS, null, resultSchema, 
Collections.emptyList());
+            }
+
+            // a new token arrives, move the current buffer data into the prev 
buffered results.
+            bufferedPrevResults.clear();
+            if (bufferedResults.isEmpty()) {
+                // buffered results have been totally consumed,
+                // so try to fetch new results
+                Optional<List<RowData>> newResults = 
resultStore.retrieveRecords();
+                if (newResults.isPresent()) {
+                    bufferedResults.addAll(newResults.get());
+                } else {
+                    noMoreResults = true;
+                    return new ResultSet(
+                            ResultSet.ResultType.EOS, null, resultSchema, 
Collections.emptyList());
+                }
+            }
+
+            previousMaxFetchSize = maxFetchSize;
+            int resultSize;
+            if (maxFetchSize != FETCH_ALL) {
+                resultSize = Math.min(bufferedResults.size(), maxFetchSize);
+            } else {
+                resultSize = bufferedResults.size();
+            }
+
+            LOG.debug(
+                    "Fetching current result for operation: {}, token: {}, 
maxFetchSize: {}, realReturnSize: {}.",

Review Comment:
   ```suggestion
                       "Fetching current result for operation: {}, token: {}, 
maxFetchSize: {}, resultSize: {}.",
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.table.gateway.common.session.SessionHandle;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
+
+import java.io.Closeable;
+import java.util.Map;
+
+/** Session that manages the registered resource including jars, registered 
table. */
+public class Session implements Closeable {
+
+    private final SessionContext sessionContext;
+    private long lastAccessTime;

Review Comment:
   Ticket: https://issues.apache.org/jira/browse/FLINK-28053



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.operation;
+
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationStatus;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.result.ExecutionResult;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
+
+/** Entity that contains the resource to run and the execution results. */
+public class Operation {

Review Comment:
   ????



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/session/SessionEnvironment.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common.session;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.gateway.common.endpoint.EndpointVersion;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Environment to initialize the {@code Session}. */
+@PublicEvolving
+public class SessionEnvironment {
+    private final @Nullable String sessionName;
+    private final EndpointVersion version;
+    private final List<String> libs;
+    private final List<String> jars;
+    private final Map<String, String> sessionConfig;
+
+    @VisibleForTesting
+    SessionEnvironment(
+            @Nullable String sessionName,
+            EndpointVersion version,
+            List<String> libs,
+            List<String> jars,
+            Map<String, String> sessionConfig) {
+        this.sessionName = sessionName;
+        this.version = version;
+        this.libs = checkNotNull(libs);
+        this.jars = checkNotNull(jars);

Review Comment:
   I mean shall we check whether it's a valid path, valid jar file, or existing 
path/file?



##########
flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/SqlGatewayService.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.gateway.common.operation.OperationHandle;
+import org.apache.flink.table.gateway.common.operation.OperationType;
+import org.apache.flink.table.gateway.common.results.OperationInfo;
+import org.apache.flink.table.gateway.common.results.ResultSet;
+import org.apache.flink.table.gateway.common.session.SessionEnvironment;
+import org.apache.flink.table.gateway.common.session.SessionHandle;
+import org.apache.flink.table.gateway.common.utils.SqlGatewayException;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+/**
+ * A service of SQL gateway is responsible for talking requests from the 
endpoints. from the
+ * endpoints.

Review Comment:
   ```suggestion
    * A service of SQL gateway is responsible for handling requests from the 
endpoints.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to