wuchong commented on code in PR #19823: URL: https://github.com/apache/flink/pull/19823#discussion_r897625718
########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.result; + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.util.CloseableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; + +/** + * A fetcher to fetch result from submitted statement. + * + * <p>The fetcher uses the {@link Iterator} model. It means every time fetch the result with the + * current token, the fetcher will move forward and retire the old data. + */ +public class ResultFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(ResultFetcher.class); + + public static final int FETCH_ALL = Integer.MAX_VALUE; + + private final OperationHandle operationHandle; + + private final ResolvedSchema resultSchema; + private final ResultStore resultStore; + private final LinkedList<RowData> bufferedResults = new LinkedList<>(); + private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>(); + + private long currentToken = 0; + private int previousMaxFetchSize = 0; + private boolean noMoreResults = false; + + public ResultFetcher( + OperationHandle operationHandle, + ResolvedSchema resultSchema, + CloseableIterator<RowData> resultRows, + int maxBufferSize) { + this.operationHandle = operationHandle; + this.resultSchema = resultSchema; + this.resultStore = new ResultStore(resultRows, maxBufferSize); + } + + public void close() { + resultStore.close(); + } + + public ResultSet fetchResults(long token, int maxFetchSize) { + if (maxFetchSize <= 0) { + throw new IllegalArgumentException( + String.format( + "The max rows should be larger than 0 or use %s to fetch all available data.", Review Comment: I think `FETCH_ALL` doesn't make sense here, because you just fetch Int.MAX rows in case of `FETCH_ALL`, not all the rows in the iterator. Why not remove the FETCH_ALL shortcut? ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.result; + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.util.CloseableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; + +/** + * A fetcher to fetch result from submitted statement. + * + * <p>The fetcher uses the {@link Iterator} model. It means every time fetch the result with the + * current token, the fetcher will move forward and retire the old data. + */ +public class ResultFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(ResultFetcher.class); + + public static final int FETCH_ALL = Integer.MAX_VALUE; + + private final OperationHandle operationHandle; + + private final ResolvedSchema resultSchema; + private final ResultStore resultStore; + private final LinkedList<RowData> bufferedResults = new LinkedList<>(); + private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>(); + + private long currentToken = 0; + private int previousMaxFetchSize = 0; + private boolean noMoreResults = false; + + public ResultFetcher( + OperationHandle operationHandle, + ResolvedSchema resultSchema, + CloseableIterator<RowData> resultRows, + int maxBufferSize) { + this.operationHandle = operationHandle; + this.resultSchema = resultSchema; + this.resultStore = new ResultStore(resultRows, maxBufferSize); + } + + public void close() { + resultStore.close(); + } + + public ResultSet fetchResults(long token, int maxFetchSize) { + if (maxFetchSize <= 0) { + throw new IllegalArgumentException( + String.format( + "The max rows should be larger than 0 or use %s to fetch all available data.", + FETCH_ALL)); + } + + if (token == currentToken) { + // equal to the Iterator.next() + if (noMoreResults) { + LOG.debug("There is no more result for operation: {}.", operationHandle); + return new ResultSet( + ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList()); + } + + // a new token arrives, move the current buffer data into the prev buffered results. + bufferedPrevResults.clear(); + if (bufferedResults.isEmpty()) { + // buffered results have been totally consumed, + // so try to fetch new results + Optional<List<RowData>> newResults = resultStore.retrieveRecords(); + if (newResults.isPresent()) { + bufferedResults.addAll(newResults.get()); + } else { + noMoreResults = true; + return new ResultSet( + ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList()); + } + } + + previousMaxFetchSize = maxFetchSize; + int resultSize; + if (maxFetchSize != FETCH_ALL) { + resultSize = Math.min(bufferedResults.size(), maxFetchSize); + } else { + resultSize = bufferedResults.size(); + } + + LOG.debug( + "Fetching current result for operation: {}, token: {}, maxFetchSize: {}, realReturnSize: {}.", + operationHandle, + token, + maxFetchSize, + resultSize); + + // move forward + currentToken++; + // move result to buffer + for (int i = 0; i < resultSize; i++) { + bufferedPrevResults.add(bufferedResults.removeFirst()); + } + return new ResultSet( + ResultSet.ResultType.PAYLOAD, currentToken, resultSchema, bufferedPrevResults); + } else if (token == currentToken - 1 && token >= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Fetching previous result for operation: {}, token: {}, maxFetchSize: {}", + operationHandle, + token, + maxFetchSize); + } + if (previousMaxFetchSize != maxFetchSize) { + String msg = + String.format( + "As the same token is provided, fetch size must be the same. Expecting max_fetch_size to be %s, current max_fetch_size to be %s.", + previousMaxFetchSize, maxFetchSize); Review Comment: IMO, it would be fine as long as the `maxFetchSize >= previousResultSize`, where `previousResultSize` is the real size of the previous fetch result. `maxFetchSize` is a protection to avoid result set larger than expected. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.operation; + +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.operation.OperationStatus; +import org.apache.flink.table.gateway.common.operation.OperationType; +import org.apache.flink.table.gateway.common.results.OperationInfo; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +/** Entity that contains the resource to run and the execution results. */ +public class Operation { + + private static final Logger LOG = LoggerFactory.getLogger(Operation.class); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + private final OperationHandle operationHandle; + + private final OperationType operationType; + private final boolean hasResults; + private OperationStatus status; + + private final Callable<ResultFetcher> resultSupplier; + + private Future<?> invocation; + private ResultFetcher resultFetcher; + private SqlExecutionException operationError; + + public Operation( + OperationHandle operationHandle, + OperationType operationType, + Callable<ResultFetcher> resultSupplier) { + this.operationHandle = operationHandle; + this.status = OperationStatus.INITIALIZED; + this.operationType = operationType; + this.hasResults = true; + this.resultSupplier = resultSupplier; + } + + void runBefore() { + updateState(OperationStatus.PENDING); + } + + void runAfter() { + updateState(OperationStatus.FINISHED); + } + + public void run(ExecutorService service) { + invocation = + service.submit( + () -> { + try { + runBefore(); + updateState(OperationStatus.RUNNING); + ResultFetcher fetcher = resultSupplier.call(); + writeLock(() -> resultFetcher = fetcher); + runAfter(); + } catch (Exception e) { + String msg = + String.format( + "Failed to execute the operation %s.", + operationHandle); + LOG.error(msg, e); + writeLock( + () -> { + updateState(OperationStatus.ERROR); + operationError = new SqlExecutionException(msg, e); + }); + } + }); + } + + public void cancel() { + writeLock( + () -> { + if (invocation != null && !invocation.isDone()) { + invocation.cancel(true); + } + + if (resultFetcher != null) { + resultFetcher.close(); + resultFetcher = null; + } + + updateState(OperationStatus.CANCELED); + }); + } + + public void close() { + writeLock( + () -> { + if (invocation != null && !invocation.isDone()) { + invocation.cancel(true); + } + + if (resultFetcher != null) { + resultFetcher.close(); + resultFetcher = null; + } + + updateState(OperationStatus.CLOSED); + }); + } + + public ResultSet fetchResults(long token, int maxRows) { + OperationStatus currentStatus = readLock(() -> status); + + if (currentStatus == OperationStatus.ERROR) { + throw operationError; + } else if (currentStatus == OperationStatus.FINISHED) { + return resultFetcher.fetchResults(token, maxRows); + } else { Review Comment: Why throw exceptions for all the other cases? I think it's very possible to call fetchResult before the operation is scheduled. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.operation; + +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.operation.OperationStatus; +import org.apache.flink.table.gateway.common.operation.OperationType; +import org.apache.flink.table.gateway.common.results.OperationInfo; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +/** Entity that contains the resource to run and the execution results. */ +public class Operation { + + private static final Logger LOG = LoggerFactory.getLogger(Operation.class); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + private final OperationHandle operationHandle; + + private final OperationType operationType; + private final boolean hasResults; + private OperationStatus status; + + private final Callable<ResultFetcher> resultSupplier; + + private Future<?> invocation; + private ResultFetcher resultFetcher; + private SqlExecutionException operationError; + + public Operation( + OperationHandle operationHandle, + OperationType operationType, + Callable<ResultFetcher> resultSupplier) { + this.operationHandle = operationHandle; + this.status = OperationStatus.INITIALIZED; + this.operationType = operationType; + this.hasResults = true; + this.resultSupplier = resultSupplier; + } + + void runBefore() { + updateState(OperationStatus.PENDING); + } + + void runAfter() { + updateState(OperationStatus.FINISHED); + } + + public void run(ExecutorService service) { + invocation = + service.submit( + () -> { + try { + runBefore(); + updateState(OperationStatus.RUNNING); + ResultFetcher fetcher = resultSupplier.call(); + writeLock(() -> resultFetcher = fetcher); Review Comment: I'm thinking that can we use `AtomicReference` for `status` which may avoid so many locks. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.operation; + +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.operation.OperationStatus; +import org.apache.flink.table.gateway.common.operation.OperationType; +import org.apache.flink.table.gateway.common.results.OperationInfo; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +/** Entity that contains the resource to run and the execution results. */ +public class Operation { + + private static final Logger LOG = LoggerFactory.getLogger(Operation.class); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + private final OperationHandle operationHandle; + + private final OperationType operationType; + private final boolean hasResults; + private OperationStatus status; + + private final Callable<ResultFetcher> resultSupplier; + + private Future<?> invocation; + private ResultFetcher resultFetcher; + private SqlExecutionException operationError; + + public Operation( + OperationHandle operationHandle, + OperationType operationType, + Callable<ResultFetcher> resultSupplier) { + this.operationHandle = operationHandle; + this.status = OperationStatus.INITIALIZED; + this.operationType = operationType; + this.hasResults = true; + this.resultSupplier = resultSupplier; + } + + void runBefore() { + updateState(OperationStatus.PENDING); + } + + void runAfter() { + updateState(OperationStatus.FINISHED); + } + + public void run(ExecutorService service) { + invocation = + service.submit( + () -> { + try { + runBefore(); Review Comment: The operation should switch to pending before schduled. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.operation; + +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.operation.OperationStatus; +import org.apache.flink.table.gateway.common.operation.OperationType; +import org.apache.flink.table.gateway.common.results.OperationInfo; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +/** Entity that contains the resource to run and the execution results. */ +public class Operation { + + private static final Logger LOG = LoggerFactory.getLogger(Operation.class); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + private final OperationHandle operationHandle; + + private final OperationType operationType; + private final boolean hasResults; + private OperationStatus status; + + private final Callable<ResultFetcher> resultSupplier; + + private Future<?> invocation; + private ResultFetcher resultFetcher; + private SqlExecutionException operationError; + + public Operation( + OperationHandle operationHandle, + OperationType operationType, + Callable<ResultFetcher> resultSupplier) { + this.operationHandle = operationHandle; + this.status = OperationStatus.INITIALIZED; + this.operationType = operationType; + this.hasResults = true; + this.resultSupplier = resultSupplier; + } + + void runBefore() { + updateState(OperationStatus.PENDING); + } + + void runAfter() { + updateState(OperationStatus.FINISHED); + } + + public void run(ExecutorService service) { + invocation = + service.submit( + () -> { + try { + runBefore(); + updateState(OperationStatus.RUNNING); + ResultFetcher fetcher = resultSupplier.call(); + writeLock(() -> resultFetcher = fetcher); + runAfter(); + } catch (Exception e) { + String msg = + String.format( + "Failed to execute the operation %s.", + operationHandle); + LOG.error(msg, e); + writeLock( + () -> { + updateState(OperationStatus.ERROR); + operationError = new SqlExecutionException(msg, e); + }); + } + }); + } + + public void cancel() { + writeLock( + () -> { + if (invocation != null && !invocation.isDone()) { + invocation.cancel(true); + } + + if (resultFetcher != null) { + resultFetcher.close(); + resultFetcher = null; + } + + updateState(OperationStatus.CANCELED); + }); + } + + public void close() { + writeLock( + () -> { + if (invocation != null && !invocation.isDone()) { + invocation.cancel(true); + } + + if (resultFetcher != null) { + resultFetcher.close(); + resultFetcher = null; + } + + updateState(OperationStatus.CLOSED); + }); + } + + public ResultSet fetchResults(long token, int maxRows) { + OperationStatus currentStatus = readLock(() -> status); + + if (currentStatus == OperationStatus.ERROR) { + throw operationError; + } else if (currentStatus == OperationStatus.FINISHED) { + return resultFetcher.fetchResults(token, maxRows); Review Comment: The `resultFetcher` may be under closing when calling `fetchResult`. We should add a `volatile boolean closed` flag in ResultFetcher to avoid race conditions. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java: ########## @@ -0,0 +1,326 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service; + +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.gateway.common.SqlGatewayService; +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.operation.OperationStatus; +import org.apache.flink.table.gateway.common.operation.OperationType; +import org.apache.flink.table.gateway.common.results.OperationInfo; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.common.session.SessionEnvironment; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.common.utils.MockedEndpointVersion; +import org.apache.flink.table.gateway.service.session.SessionManager; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestResource; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.function.RunnableWithException; + +import org.assertj.core.api.Assertions; +import org.hamcrest.Matchers; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; + +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** ITCase for {@link SqlGatewayServiceImpl}. */ +public class SqlGatewayServiceITCase extends AbstractTestBase { + + @ClassRule + public static final SqlGatewayServiceTestResource SQL_GATEWAY_SERVICE_RESOURCE = + new SqlGatewayServiceTestResource(); + + private static SessionManager sessionManager; + private static SqlGatewayService service; + + private final SessionEnvironment defaultSessionEnvironment = + SessionEnvironment.newBuilder() + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .build(); + + @BeforeClass + public static void setup() { + sessionManager = SQL_GATEWAY_SERVICE_RESOURCE.getSessionManager(); + service = SQL_GATEWAY_SERVICE_RESOURCE.getService(); + } + + @Test + public void testOpenSessionWithConfig() { + Map<String, String> options = new HashMap<>(); + options.put("key1", "val1"); + options.put("key2", "val2"); + SessionEnvironment environment = + SessionEnvironment.newBuilder() + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .addSessionConfig(options) + .build(); + + SessionHandle sessionHandle = service.openSession(environment); + Map<String, String> actualConfig = service.getSessionConfig(sessionHandle); + + assertThat( + actualConfig, + allOf( + options.entrySet().stream() + .map(entry -> Matchers.hasEntry(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()))); + } Review Comment: I was thinking we can simply assert by `assertEquals(actualConfig, options)`. However, it seems there are more entries in `actualConfig`. If that is the case, we can use `IsMapContaining`. ```java options.forEach((k, v) -> assertThat(actualConfig, IsMapContaining.hasEntry(k, v))); ``` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.result; + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.util.CloseableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; + +/** + * A fetcher to fetch result from submitted statement. + * + * <p>The fetcher uses the {@link Iterator} model. It means every time fetch the result with the + * current token, the fetcher will move forward and retire the old data. + */ +public class ResultFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(ResultFetcher.class); + + public static final int FETCH_ALL = Integer.MAX_VALUE; + + private final OperationHandle operationHandle; + + private final ResolvedSchema resultSchema; + private final ResultStore resultStore; + private final LinkedList<RowData> bufferedResults = new LinkedList<>(); + private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>(); + + private long currentToken = 0; + private int previousMaxFetchSize = 0; + private boolean noMoreResults = false; + + public ResultFetcher( + OperationHandle operationHandle, + ResolvedSchema resultSchema, + CloseableIterator<RowData> resultRows, + int maxBufferSize) { + this.operationHandle = operationHandle; + this.resultSchema = resultSchema; + this.resultStore = new ResultStore(resultRows, maxBufferSize); + } + + public void close() { + resultStore.close(); + } + + public ResultSet fetchResults(long token, int maxFetchSize) { + if (maxFetchSize <= 0) { + throw new IllegalArgumentException( + String.format( + "The max rows should be larger than 0 or use %s to fetch all available data.", + FETCH_ALL)); + } + + if (token == currentToken) { + // equal to the Iterator.next() + if (noMoreResults) { + LOG.debug("There is no more result for operation: {}.", operationHandle); + return new ResultSet( + ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList()); + } + + // a new token arrives, move the current buffer data into the prev buffered results. + bufferedPrevResults.clear(); + if (bufferedResults.isEmpty()) { + // buffered results have been totally consumed, + // so try to fetch new results + Optional<List<RowData>> newResults = resultStore.retrieveRecords(); + if (newResults.isPresent()) { + bufferedResults.addAll(newResults.get()); + } else { + noMoreResults = true; + return new ResultSet( + ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList()); + } + } + + previousMaxFetchSize = maxFetchSize; + int resultSize; + if (maxFetchSize != FETCH_ALL) { + resultSize = Math.min(bufferedResults.size(), maxFetchSize); + } else { + resultSize = bufferedResults.size(); + } + + LOG.debug( + "Fetching current result for operation: {}, token: {}, maxFetchSize: {}, realReturnSize: {}.", Review Comment: ```suggestion "Fetching current result for operation: {}, token: {}, maxFetchSize: {}, resultSize: {}.", ``` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.session; + +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.operation.OperationManager; + +import java.io.Closeable; +import java.util.Map; + +/** Session that manages the registered resource including jars, registered table. */ +public class Session implements Closeable { + + private final SessionContext sessionContext; + private long lastAccessTime; Review Comment: Ticket: https://issues.apache.org/jira/browse/FLINK-28053 ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.operation; + +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.operation.OperationStatus; +import org.apache.flink.table.gateway.common.operation.OperationType; +import org.apache.flink.table.gateway.common.results.OperationInfo; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.result.ExecutionResult; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +/** Entity that contains the resource to run and the execution results. */ +public class Operation { Review Comment: ???? ########## flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/session/SessionEnvironment.java: ########## @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.common.session; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.gateway.common.endpoint.EndpointVersion; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** Environment to initialize the {@code Session}. */ +@PublicEvolving +public class SessionEnvironment { + private final @Nullable String sessionName; + private final EndpointVersion version; + private final List<String> libs; + private final List<String> jars; + private final Map<String, String> sessionConfig; + + @VisibleForTesting + SessionEnvironment( + @Nullable String sessionName, + EndpointVersion version, + List<String> libs, + List<String> jars, + Map<String, String> sessionConfig) { + this.sessionName = sessionName; + this.version = version; + this.libs = checkNotNull(libs); + this.jars = checkNotNull(jars); Review Comment: I mean shall we check whether it's a valid path, valid jar file, or existing path/file? ########## flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/SqlGatewayService.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.common; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.operation.OperationType; +import org.apache.flink.table.gateway.common.results.OperationInfo; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.common.session.SessionEnvironment; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; + +import java.util.Map; +import java.util.concurrent.Callable; + +/** + * A service of SQL gateway is responsible for talking requests from the endpoints. from the + * endpoints. Review Comment: ```suggestion * A service of SQL gateway is responsible for handling requests from the endpoints. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
