This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 626bdacdbe303d9e9c346e7ba7f3d8c7b42b1d8e Author: Shengkai <1059623...@qq.com> AuthorDate: Wed Jun 22 20:21:54 2022 +0800 [FLINK-27766][sql-gateway] Introduce the OperationManager for the SqlGatewayService --- .../flink/table/gateway/api/SqlGatewayService.java | 69 ++++ .../gateway/api/operation/OperationHandle.java | 66 ++++ .../gateway/api/operation/OperationStatus.java | 89 +++++ .../table/gateway/api/operation/OperationType.java | 31 ++ .../table/gateway/api/results/OperationInfo.java | 69 ++++ .../flink/table/gateway/api/results/ResultSet.java | 131 +++++++ .../gateway/api/operation/OperationStatusTest.java | 113 ++++++ .../gateway/service/SqlGatewayServiceImpl.java | 64 ++++ .../gateway/service/context/SessionContext.java | 24 +- .../table/gateway/service/operation/Operation.java | 160 +++++++++ .../service/operation/OperationManager.java | 199 +++++++++++ .../gateway/service/result/ResultFetcher.java | 173 ++++++++++ .../table/gateway/service/result/ResultStore.java | 149 ++++++++ .../table/gateway/service/session/Session.java | 8 + .../gateway/service/session/SessionManager.java | 8 +- .../gateway/service/SqlGatewayServiceITCase.java | 382 +++++++++++++++++++++ .../gateway/service/result/ResultFetcherTest.java | 364 ++++++++++++++++++++ .../src/test/resources/log4j2-test.properties | 28 ++ 18 files changed, 2123 insertions(+), 4 deletions(-) diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java index 933c797f243..c0bdf01c36e 100644 --- a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/SqlGatewayService.java @@ -19,11 +19,16 @@ package org.apache.flink.table.gateway.api; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.operation.OperationType; +import org.apache.flink.table.gateway.api.results.OperationInfo; +import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; import java.util.Map; +import java.util.concurrent.Callable; /** A service of SQL gateway is responsible for handling requests from the endpoints. */ @PublicEvolving @@ -55,4 +60,68 @@ public interface SqlGatewayService { * @return Returns configuration of the session. */ Map<String, String> getSessionConfig(SessionHandle sessionHandle) throws SqlGatewayException; + + // ------------------------------------------------------------------------------------------- + // Operation Management + // ------------------------------------------------------------------------------------------- + + /** + * Submit an operation and execute. The {@link SqlGatewayService} will take care of the + * execution and assign the {@link OperationHandle} for later to retrieve the results. + * + * @param sessionHandle handle to identify the session. + * @param type describe the operation type. + * @param executor the main logic to get the execution results. + * @return Returns the handle for later retrieve results. + */ + OperationHandle submitOperation( + SessionHandle sessionHandle, OperationType type, Callable<ResultSet> executor) + throws SqlGatewayException; + + /** + * Cancel the operation when it is not in terminal status. + * + * <p>It can't cancel an Operation if it is terminated. + * + * @param sessionHandle handle to identify the session. + * @param operationHandle handle to identify the operation.JarURLConnection + */ + void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException; + + /** + * Close the operation and release all used resource by the operation. + * + * @param sessionHandle handle to identify the session. + * @param operationHandle handle to identify the operation. + */ + void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException; + + /** + * Get the {@link OperationInfo} of the operation. + * + * @param sessionHandle handle to identify the session. + * @param operationHandle handle to identify the operation. + */ + OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle) + throws SqlGatewayException; + + // ------------------------------------------------------------------------------------------- + // Statements + // ------------------------------------------------------------------------------------------- + + /** + * Fetch the results from the operation. When maxRows is Integer.MAX_VALUE, it means to fetch + * all available data. + * + * @param sessionHandle handle to identify the session. + * @param operationHandle handle to identify the operation. + * @param token token to identify results. + * @param maxRows max number of rows to fetch. + * @return Returns the results. + */ + ResultSet fetchResults( + SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows) + throws SqlGatewayException; } diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationHandle.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationHandle.java new file mode 100644 index 00000000000..dfdb3a6b173 --- /dev/null +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationHandle.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.operation; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.gateway.api.HandleIdentifier; + +import java.util.Objects; +import java.util.UUID; + +/** {@link OperationHandle} to index the {@code Operation}. */ +@PublicEvolving +public class OperationHandle { + + private final HandleIdentifier identifier; + + public static OperationHandle create() { + return new OperationHandle(new HandleIdentifier(UUID.randomUUID(), UUID.randomUUID())); + } + + public OperationHandle(HandleIdentifier identifier) { + this.identifier = identifier; + } + + public HandleIdentifier getIdentifier() { + return identifier; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof OperationHandle)) { + return false; + } + OperationHandle that = (OperationHandle) o; + return Objects.equals(identifier, that.identifier); + } + + @Override + public int hashCode() { + return Objects.hash(identifier); + } + + @Override + public String toString() { + return identifier.toString(); + } +} diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationStatus.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationStatus.java new file mode 100644 index 00000000000..4288276c19f --- /dev/null +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationStatus.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.operation; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +/** Status to describe the {@code Operation}. */ +@PublicEvolving +public enum OperationStatus { + /** The operation is newly created. */ + INITIALIZED(false), + + /** Prepare the resources for the operation. */ + PENDING(false), + + /** The operation is running. */ + RUNNING(false), + + /** All the work is finished and ready for the client to fetch the results. */ + FINISHED(true), + + /** Operation has been cancelled. */ + CANCELED(true), + + /** Operation has been closed and all related resources are collected. */ + CLOSED(true), + + /** Some error happens. */ + ERROR(true), + + /** The execution of the operation timeout. */ + TIMEOUT(true); + + private final boolean isTerminalStatus; + + OperationStatus(boolean isTerminalStatus) { + this.isTerminalStatus = isTerminalStatus; + } + + public static boolean isValidStatusTransition( + OperationStatus fromStatus, OperationStatus toStatus) { + return toOperationStatusSet(fromStatus).contains(toStatus); + } + + public boolean isTerminalStatus() { + return isTerminalStatus; + } + + private static Set<OperationStatus> toOperationStatusSet(OperationStatus fromStatus) { + switch (fromStatus) { + case INITIALIZED: + return new HashSet<>(Arrays.asList(PENDING, CANCELED, CLOSED, TIMEOUT, ERROR)); + case PENDING: + return new HashSet<>(Arrays.asList(RUNNING, CANCELED, CLOSED, TIMEOUT, ERROR)); + case RUNNING: + return new HashSet<>(Arrays.asList(FINISHED, CANCELED, CLOSED, TIMEOUT, ERROR)); + case FINISHED: + case CANCELED: + case TIMEOUT: + case ERROR: + return Collections.singleton(CLOSED); + case CLOSED: + return Collections.emptySet(); + default: + throw new IllegalArgumentException("Unknown from status: " + fromStatus); + } + } +} diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java new file mode 100644 index 00000000000..babe99fd3f9 --- /dev/null +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/operation/OperationType.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.operation; + +import org.apache.flink.annotation.PublicEvolving; + +/** The Operation Type. */ +@PublicEvolving +public enum OperationType { + /** The type indicates the operation executes statements. */ + EXECUTE_STATEMENT, + + /** The type indicates the operation is unknown. */ + UNKNOWN; +} diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java new file mode 100644 index 00000000000..7ebb28f5169 --- /dev/null +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/OperationInfo.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.results; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.gateway.api.operation.OperationStatus; +import org.apache.flink.table.gateway.api.operation.OperationType; + +import java.util.Objects; + +/** Information of the {@code Operation}. */ +@PublicEvolving +public class OperationInfo { + + private final OperationStatus status; + private final boolean hasResults; + private final OperationType type; + + public OperationInfo(OperationStatus status, OperationType type, boolean hasResults) { + this.status = status; + this.type = type; + this.hasResults = hasResults; + } + + public boolean isHasResults() { + return hasResults; + } + + public OperationType getType() { + return type; + } + + public OperationStatus getStatus() { + return status; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof OperationInfo)) { + return false; + } + OperationInfo that = (OperationInfo) o; + return hasResults == that.hasResults && status == that.status && type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(status, hasResults, type); + } +} diff --git a/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java new file mode 100644 index 00000000000..3750a1cee95 --- /dev/null +++ b/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/results/ResultSet.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.results; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** The collection of the results. */ +@PublicEvolving +public class ResultSet { + + private final ResultType resultType; + + @Nullable private final Long nextToken; + + private final ResolvedSchema resultSchema; + private final List<RowData> data; + + public static final ResultSet NOT_READY_RESULTS = + new ResultSet( + ResultType.NOT_READY, + 0L, + ResolvedSchema.of(Collections.emptyList()), + Collections.emptyList()); + + public ResultSet( + ResultType resultType, + @Nullable Long nextToken, + ResolvedSchema resultSchema, + List<RowData> data) { + this.nextToken = nextToken; + this.resultType = resultType; + this.resultSchema = resultSchema; + this.data = data; + } + + /** Get the type of the results, which may indicate the result is EOS or has data. */ + public ResultType getResultType() { + return resultType; + } + + /** + * The token indicates the next batch of the data. + * + * <p>When the token is null, it means all the data has been fetched. + */ + public @Nullable Long getNextToken() { + return nextToken; + } + + /** The schema of the data. */ + public ResolvedSchema getResultSchema() { + return resultSchema; + } + + /** All the data in the current results. */ + public List<RowData> getData() { + return data; + } + + @Override + public String toString() { + return String.format( + "ResultSet{\n" + + " resultType=%s,\n" + + " nextToken=%s,\n" + + " resultSchema=%s,\n" + + " data=[%s]\n" + + "}", + resultType, + nextToken, + resultSchema.toString(), + data.stream().map(Object::toString).collect(Collectors.joining(","))); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ResultSet)) { + return false; + } + ResultSet resultSet = (ResultSet) o; + return resultType == resultSet.resultType + && Objects.equals(nextToken, resultSet.nextToken) + && Objects.equals(resultSchema, resultSet.resultSchema) + && Objects.equals(data, resultSet.data); + } + + @Override + public int hashCode() { + return Objects.hash(resultType, nextToken, resultSchema, data); + } + + /** Describe the kind of the result. */ + public enum ResultType { + /** Indicate the result is not ready. */ + NOT_READY, + + /** Indicate the result has data. */ + PAYLOAD, + + /** Indicate all results have been fetched. */ + EOS + } +} diff --git a/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/operation/OperationStatusTest.java b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/operation/OperationStatusTest.java new file mode 100644 index 00000000000..4fb9b04c83c --- /dev/null +++ b/flink-table/flink-sql-gateway-api/src/test/java/org/apache/flink/table/gateway/api/operation/OperationStatusTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.api.operation; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +import static org.apache.flink.table.gateway.api.operation.OperationStatus.CANCELED; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.CLOSED; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.ERROR; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.FINISHED; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.INITIALIZED; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.PENDING; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.RUNNING; +import static org.apache.flink.table.gateway.api.operation.OperationStatus.TIMEOUT; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Test for {@link OperationStatus}. */ +public class OperationStatusTest { + + @Test + public void testSupportedTransition() { + for (OperationStatusTransition translation : getSupportedOperationStatusTransition()) { + assertTrue(OperationStatus.isValidStatusTransition(translation.from, translation.to)); + } + } + + @Test + public void testUnsupportedTransition() { + Set<OperationStatusTransition> supported = getSupportedOperationStatusTransition(); + for (OperationStatus from : OperationStatus.values()) { + for (OperationStatus to : OperationStatus.values()) { + if (supported.contains(new OperationStatusTransition(from, to))) { + continue; + } + + assertFalse(OperationStatus.isValidStatusTransition(from, to)); + } + } + } + + private Set<OperationStatusTransition> getSupportedOperationStatusTransition() { + return new HashSet<>( + Arrays.asList( + new OperationStatusTransition(INITIALIZED, PENDING), + new OperationStatusTransition(PENDING, RUNNING), + new OperationStatusTransition(RUNNING, FINISHED), + new OperationStatusTransition(INITIALIZED, CANCELED), + new OperationStatusTransition(PENDING, CANCELED), + new OperationStatusTransition(RUNNING, CANCELED), + new OperationStatusTransition(INITIALIZED, TIMEOUT), + new OperationStatusTransition(PENDING, TIMEOUT), + new OperationStatusTransition(RUNNING, TIMEOUT), + new OperationStatusTransition(INITIALIZED, ERROR), + new OperationStatusTransition(PENDING, ERROR), + new OperationStatusTransition(RUNNING, ERROR), + new OperationStatusTransition(INITIALIZED, CLOSED), + new OperationStatusTransition(PENDING, CLOSED), + new OperationStatusTransition(RUNNING, CLOSED), + new OperationStatusTransition(CANCELED, CLOSED), + new OperationStatusTransition(TIMEOUT, CLOSED), + new OperationStatusTransition(ERROR, CLOSED), + new OperationStatusTransition(FINISHED, CLOSED))); + } + + private static class OperationStatusTransition { + OperationStatus from; + OperationStatus to; + + public OperationStatusTransition(OperationStatus from, OperationStatus to) { + this.from = from; + this.to = to; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof OperationStatusTransition)) { + return false; + } + OperationStatusTransition that = (OperationStatusTransition) o; + return from == that.from && to == that.to; + } + + @Override + public int hashCode() { + return Objects.hash(from, to); + } + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java index 2606f8f7763..39e253c020b 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/SqlGatewayServiceImpl.java @@ -20,6 +20,10 @@ package org.apache.flink.table.gateway.service; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.gateway.api.SqlGatewayService; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.operation.OperationType; +import org.apache.flink.table.gateway.api.results.OperationInfo; +import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.SqlGatewayException; @@ -30,6 +34,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.concurrent.Callable; /** The implementation of the {@link SqlGatewayService} interface. */ public class SqlGatewayServiceImpl implements SqlGatewayService { @@ -73,6 +78,65 @@ public class SqlGatewayServiceImpl implements SqlGatewayService { } } + @Override + public OperationHandle submitOperation( + SessionHandle sessionHandle, OperationType type, Callable<ResultSet> executor) + throws SqlGatewayException { + try { + return getSession(sessionHandle).getOperationManager().submitOperation(type, executor); + } catch (Throwable e) { + LOG.error("Failed to submitOperation.", e); + throw new SqlGatewayException("Failed to submitOperation.", e); + } + } + + @Override + public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle) { + try { + getSession(sessionHandle).getOperationManager().cancelOperation(operationHandle); + } catch (Throwable t) { + LOG.error("Failed to cancelOperation.", t); + throw new SqlGatewayException("Failed to cancelOperation.", t); + } + } + + @Override + public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle) { + try { + getSession(sessionHandle).getOperationManager().closeOperation(operationHandle); + } catch (Throwable t) { + LOG.error("Failed to closeOperation.", t); + throw new SqlGatewayException("Failed to closeOperation.", t); + } + } + + @Override + public OperationInfo getOperationInfo( + SessionHandle sessionHandle, OperationHandle operationHandle) { + try { + return getSession(sessionHandle) + .getOperationManager() + .getOperationInfo(operationHandle); + } catch (Throwable t) { + LOG.error("Failed to getOperationInfo.", t); + throw new SqlGatewayException("Failed to getOperationInfo.", t); + } + } + + @Override + public ResultSet fetchResults( + SessionHandle sessionHandle, OperationHandle operationHandle, long token, int maxRows) + throws SqlGatewayException { + try { + return getSession(sessionHandle) + .getOperationManager() + .fetchResults(operationHandle, token, maxRows); + } catch (Throwable t) { + LOG.error("Failed to fetchResults.", t); + throw new SqlGatewayException("Failed to fetchResults.", t); + } + } + @VisibleForTesting Session getSession(SessionHandle sessionHandle) { return sessionManager.getSession(sessionHandle); diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java index 83bc5758ce0..b2e3921ba07 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java @@ -37,6 +37,7 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.gateway.api.endpoint.EndpointVersion; import org.apache.flink.table.gateway.api.session.SessionHandle; +import org.apache.flink.table.gateway.service.operation.OperationManager; import org.apache.flink.table.module.ModuleManager; import org.slf4j.Logger; @@ -51,6 +52,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; /** * Context describing a session, it's mainly used for user to open a new session in the backend. If @@ -69,17 +71,21 @@ public class SessionContext { private final SessionState sessionState; private final URLClassLoader userClassloader; + private final OperationManager operationManager; + private SessionContext( SessionHandle sessionId, EndpointVersion endpointVersion, Configuration sessionConf, URLClassLoader classLoader, - SessionState sessionState) { + SessionState sessionState, + OperationManager operationManager) { this.sessionId = sessionId; this.endpointVersion = endpointVersion; this.sessionConf = sessionConf; this.userClassloader = classLoader; this.sessionState = sessionState; + this.operationManager = operationManager; } // -------------------------------------------------------------------------------------------- @@ -108,6 +114,8 @@ public class SessionContext { } catch (IOException e) { LOG.debug("Error while closing class loader.", e); } + + operationManager.close(); } // -------------------------------------------------------------------------------------------- @@ -118,7 +126,8 @@ public class SessionContext { DefaultContext defaultContext, SessionHandle sessionId, EndpointVersion endpointVersion, - Configuration sessionConf) { + Configuration sessionConf, + ExecutorService operationExecutorService) { // -------------------------------------------------------------------------------------------------------------- // Init config // -------------------------------------------------------------------------------------------------------------- @@ -159,7 +168,12 @@ public class SessionContext { new SessionState(catalogManager, moduleManager, functionCatalog); return new SessionContext( - sessionId, endpointVersion, configuration, classLoader, sessionState); + sessionId, + endpointVersion, + configuration, + classLoader, + sessionState, + new OperationManager(operationExecutorService)); } private static URLClassLoader buildClassLoader( @@ -204,6 +218,10 @@ public class SessionContext { userClassloader); } + public OperationManager getOperationManager() { + return operationManager; + } + private TableEnvironmentInternal createStreamTableEnvironment( StreamExecutionEnvironment env, EnvironmentSettings settings, diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java new file mode 100644 index 00000000000..2262563edc6 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.operation; + +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.operation.OperationStatus; +import org.apache.flink.table.gateway.api.operation.OperationType; +import org.apache.flink.table.gateway.api.results.OperationInfo; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.table.gateway.api.results.ResultSet.NOT_READY_RESULTS; + +/** Operation to manage the execution, results and so on. */ +public class Operation { + + private static final Logger LOG = LoggerFactory.getLogger(Operation.class); + + private final OperationHandle operationHandle; + + private final OperationType operationType; + private final boolean hasResults; + private final AtomicReference<OperationStatus> status; + + private final Callable<ResultFetcher> resultSupplier; + + private Future<?> invocation; + private volatile ResultFetcher resultFetcher; + private volatile SqlExecutionException operationError; + + public Operation( + OperationHandle operationHandle, + OperationType operationType, + Callable<ResultFetcher> resultSupplier) { + this.operationHandle = operationHandle; + this.status = new AtomicReference<>(OperationStatus.INITIALIZED); + this.operationType = operationType; + this.hasResults = true; + this.resultSupplier = resultSupplier; + } + + void runBefore() { + updateState(OperationStatus.RUNNING); + } + + void runAfter() { + updateState(OperationStatus.FINISHED); + } + + public void run(ExecutorService service) { + updateState(OperationStatus.PENDING); + invocation = + service.submit( + () -> { + try { + runBefore(); + resultFetcher = resultSupplier.call(); + runAfter(); + } catch (Exception e) { + String msg = + String.format( + "Failed to execute the operation %s.", + operationHandle); + LOG.error(msg, e); + operationError = new SqlExecutionException(msg, e); + updateState(OperationStatus.ERROR); + } + }); + } + + public void cancel() { + updateState(OperationStatus.CANCELED); + closeResources(); + } + + public void close() { + updateState(OperationStatus.CLOSED); + closeResources(); + } + + public ResultSet fetchResults(long token, int maxRows) { + OperationStatus currentStatus = status.get(); + + if (currentStatus == OperationStatus.ERROR) { + throw operationError; + } else if (currentStatus == OperationStatus.FINISHED) { + return resultFetcher.fetchResults(token, maxRows); + } else if (currentStatus == OperationStatus.RUNNING + || currentStatus == OperationStatus.PENDING + || currentStatus == OperationStatus.INITIALIZED) { + return NOT_READY_RESULTS; + } else { + throw new SqlGatewayException( + String.format( + "Can not fetch results from the %s in %s status.", + operationHandle, currentStatus)); + } + } + + public OperationInfo getOperationInfo() { + return new OperationInfo(status.get(), operationType, hasResults); + } + + private void updateState(OperationStatus toStatus) { + OperationStatus currentStatus; + do { + currentStatus = status.get(); + boolean isValid = OperationStatus.isValidStatusTransition(currentStatus, toStatus); + if (!isValid) { + String message = + String.format( + "Failed to convert the Operation Status from %s to %s for %s.", + currentStatus, toStatus, operationHandle); + LOG.error(message); + throw new SqlGatewayException(message); + } + } while (!status.compareAndSet(currentStatus, toStatus)); + + LOG.debug( + String.format( + "Convert operation %s from %s to %s.", + operationHandle, currentStatus, toStatus)); + } + + private void closeResources() { + if (invocation != null && !invocation.isDone()) { + invocation.cancel(true); + } + + if (resultFetcher != null) { + resultFetcher.close(); + } + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java new file mode 100644 index 00000000000..878674a0e72 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.operation; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.operation.OperationType; +import org.apache.flink.table.gateway.api.results.OperationInfo; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.util.CloseableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +/** Manager for the {@link Operation}. */ +public class OperationManager { + + private static final Logger LOG = LoggerFactory.getLogger(OperationManager.class); + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Map<OperationHandle, Operation> submittedOperations; + private final ExecutorService service; + + private boolean isRunning; + + public OperationManager(ExecutorService service) { + this.service = service; + this.submittedOperations = new HashMap<>(); + this.isRunning = true; + } + + /** + * Submit the operation to the {@link OperationManager}. The {@link OperationManager} manges the + * lifecycle of the {@link Operation}, including register resources, fire the execution and so + * on. + * + * @param operationType The type of the submitted operation. + * @param executor Worker to execute. + * @return OperationHandle to fetch the results or check the status. + */ + public OperationHandle submitOperation( + OperationType operationType, Callable<ResultSet> executor) { + OperationHandle handle = OperationHandle.create(); + Operation operation = + new Operation( + handle, + operationType, + () -> { + ResultSet resultSet = executor.call(); + List<RowData> rows = resultSet.getData(); + return new ResultFetcher( + handle, + resultSet.getResultSchema(), + CloseableIterator.adapterForIterator(rows.iterator()), + rows.size()); + }); + + writeLock( + () -> { + submittedOperations.put(handle, operation); + operation.run(service); + }); + return handle; + } + + /** + * Cancel the execution of the operation. + * + * @param operationHandle identifies the {@link Operation}. + */ + public void cancelOperation(OperationHandle operationHandle) { + getOperation(operationHandle).cancel(); + } + + /** + * Close the operation and release all resources used by the {@link Operation}. + * + * @param operationHandle identifies the {@link Operation}. + */ + public void closeOperation(OperationHandle operationHandle) { + writeLock( + () -> { + Operation opToRemove = submittedOperations.remove(operationHandle); + if (opToRemove != null) { + opToRemove.close(); + } + }); + } + + /** + * Get the {@link OperationInfo} of the operation. + * + * @param operationHandle identifies the {@link Operation}. + */ + public OperationInfo getOperationInfo(OperationHandle operationHandle) { + return getOperation(operationHandle).getOperationInfo(); + } + + /** + * Get the results of the operation. + * + * @param operationHandle identifies the {@link Operation}. + * @param token identifies which batch of data to fetch. + * @param maxRows the maximum number of rows to fetch. + * @return ResultSet contains the results. + */ + public ResultSet fetchResults(OperationHandle operationHandle, long token, int maxRows) { + return getOperation(operationHandle).fetchResults(token, maxRows); + } + + /** Closes the {@link OperationManager} and all operations. */ + public void close() { + lock.writeLock().lock(); + try { + isRunning = false; + for (Operation operation : submittedOperations.values()) { + operation.close(); + } + submittedOperations.clear(); + } finally { + lock.writeLock().unlock(); + } + LOG.debug("Closes the Operation Manager."); + } + + // ------------------------------------------------------------------------------------------- + + @VisibleForTesting + public int getOperationCount() { + return submittedOperations.size(); + } + + @VisibleForTesting + public Operation getOperation(OperationHandle operationHandle) { + return readLock( + () -> { + Operation operation = submittedOperations.get(operationHandle); + if (operation == null) { + throw new SqlGatewayException( + String.format( + "Can not find the submitted operation in the OperationManager with the %s.", + operationHandle)); + } + return operation; + }); + } + + private void writeLock(Runnable runner) { + lock.writeLock().lock(); + try { + if (!isRunning) { + throw new SqlGatewayException("The OperationManager is closed."); + } + runner.run(); + } finally { + lock.writeLock().unlock(); + } + } + + private <T> T readLock(Supplier<T> supplier) { + lock.readLock().lock(); + try { + if (!isRunning) { + throw new SqlGatewayException("The OperationManager is closed."); + } + return supplier.get(); + } finally { + lock.readLock().unlock(); + } + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java new file mode 100644 index 00000000000..145639552dd --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.result; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.util.CloseableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; + +/** + * A fetcher to fetch result from submitted statement. + * + * <p>The fetcher uses the {@link Iterator} model. It means every time fetch the result with the + * current token, the fetcher will move forward and retire the old data. + * + * <p>After closes, the fetcher will not fetch the results from the remote but is able to return all + * data in the local cache. + */ +public class ResultFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(ResultFetcher.class); + + private final OperationHandle operationHandle; + + private final ResolvedSchema resultSchema; + private final ResultStore resultStore; + private final LinkedList<RowData> bufferedResults = new LinkedList<>(); + private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>(); + + private long currentToken = 0; + private boolean noMoreResults = false; + + public ResultFetcher( + OperationHandle operationHandle, + ResolvedSchema resultSchema, + CloseableIterator<RowData> resultRows, + int maxBufferSize) { + this.operationHandle = operationHandle; + this.resultSchema = resultSchema; + this.resultStore = new ResultStore(resultRows, maxBufferSize); + } + + public void close() { + resultStore.close(); + } + + /** + * Fetch results from the result store. It tries to return the data cached in the buffer first. + * If the buffer is empty, then fetch results from the {@link ResultStore}. It's possible + * multiple threads try to fetch results in parallel. To keep the data integration, use the + * synchronized to allow only one thread can fetch the result at any time. TODO: we should + * forbid concurrently fetch results in the FLINK-28053. + */ + public synchronized ResultSet fetchResults(long token, int maxFetchSize) { + if (maxFetchSize <= 0) { + throw new IllegalArgumentException("The max rows should be larger than 0."); + } + + if (token == currentToken) { + // equal to the Iterator.next() + if (noMoreResults) { + LOG.debug("There is no more result for operation: {}.", operationHandle); + return new ResultSet( + ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList()); + } + + // a new token arrives, move the current buffer data into the prev buffered results. + bufferedPrevResults.clear(); + if (bufferedResults.isEmpty()) { + // buffered results have been totally consumed, + // so try to fetch new results + Optional<List<RowData>> newResults = resultStore.retrieveRecords(); + + if (newResults.isPresent()) { + bufferedResults.addAll(newResults.get()); + } else { + noMoreResults = true; + return new ResultSet( + ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList()); + } + } + + int resultSize = Math.min(bufferedResults.size(), maxFetchSize); + LOG.debug( + "Fetching current result for operation: {}, token: {}, maxFetchSize: {}, resultSize: {}.", + operationHandle, + token, + maxFetchSize, + resultSize); + + // move forward + currentToken++; + // move result to buffer + for (int i = 0; i < resultSize; i++) { + bufferedPrevResults.add(bufferedResults.removeFirst()); + } + return new ResultSet( + ResultSet.ResultType.PAYLOAD, currentToken, resultSchema, bufferedPrevResults); + } else if (token == currentToken - 1 && token >= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Fetching previous result for operation: {}, token: {}, maxFetchSize: {}", + operationHandle, + token, + maxFetchSize); + } + if (maxFetchSize < bufferedPrevResults.size()) { + String msg = + String.format( + "As the same token is provided, fetch size must be not less than the previous returned buffer size." + + " Previous returned result size is %s, current max_fetch_size to be %s.", + bufferedPrevResults.size(), maxFetchSize); + if (LOG.isDebugEnabled()) { + LOG.error(msg); + } + throw new SqlExecutionException(msg); + } + return new ResultSet( + ResultSet.ResultType.PAYLOAD, currentToken, resultSchema, bufferedPrevResults); + } else { + String msg; + if (currentToken == 0) { + msg = "Expecting token to be 0, but found " + token + "."; + } else { + msg = + "Expecting token to be " + + currentToken + + " or " + + (currentToken - 1) + + ", but found " + + token + + "."; + } + if (LOG.isDebugEnabled()) { + LOG.error(msg); + } + throw new SqlExecutionException(msg); + } + } + + @VisibleForTesting + public ResultStore getResultStore() { + return resultStore; + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java new file mode 100644 index 00000000000..180da8f9e59 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultStore.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.result; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.util.CloseableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; + +/** A result store which stores and buffers results. */ +public class ResultStore { + + private static final Logger LOG = LoggerFactory.getLogger(ResultStore.class); + + private final CloseableIterator<RowData> result; + private final List<RowData> recordsBuffer = new ArrayList<>(); + private final int maxBufferSize; + + private final Object resultLock = new Object(); + private final AtomicReference<SqlExecutionException> executionException = + new AtomicReference<>(); + private final ResultRetrievalThread retrievalThread = new ResultRetrievalThread(); + + public ResultStore(CloseableIterator<RowData> result, int maxBufferSize) { + this.result = result; + this.maxBufferSize = maxBufferSize; + this.retrievalThread.start(); + } + + public void close() { + retrievalThread.isRunning = false; + retrievalThread.interrupt(); + + try { + result.close(); + } catch (Exception e) { + LOG.error("Failed to close the ResultStore. Ignore the error.", e); + } + } + + public Optional<List<RowData>> retrieveRecords() { + synchronized (resultLock) { + // retrieval thread is alive return a record if available + // but the program must not have failed + if (isRetrieving() && executionException.get() == null) { + if (recordsBuffer.isEmpty()) { + return Optional.of(Collections.emptyList()); + } else { + final List<RowData> change = new ArrayList<>(recordsBuffer); + recordsBuffer.clear(); + resultLock.notify(); + return Optional.of(change); + } + } + // retrieval thread is dead but there is still a record to be delivered + else if (!isRetrieving() && !recordsBuffer.isEmpty()) { + final List<RowData> change = new ArrayList<>(recordsBuffer); + recordsBuffer.clear(); + return Optional.of(change); + } + // no results can be returned anymore + else { + return handleMissingResult(); + } + } + } + + @VisibleForTesting + public int getBufferedRecordSize() { + synchronized (resultLock) { + return recordsBuffer.size(); + } + } + + private boolean isRetrieving() { + return retrievalThread.isRunning; + } + + private Optional<List<RowData>> handleMissingResult() { + if (executionException.get() != null) { + throw executionException.get(); + } + + // we assume that a bounded job finished + return Optional.empty(); + } + + private void processRecord(RowData row) { + synchronized (resultLock) { + // wait if the buffer is full + if (recordsBuffer.size() >= maxBufferSize) { + try { + resultLock.wait(); + } catch (InterruptedException e) { + // ignore + } + } + recordsBuffer.add(row); + } + } + + // -------------------------------------------------------------------------------------------- + + /** Thread to retrieve data from the {@link CloseableIterator} and process. */ + private class ResultRetrievalThread extends Thread { + public volatile boolean isRunning = true; + + @Override + public void run() { + try { + while (isRunning && result.hasNext()) { + processRecord(result.next()); + } + } catch (RuntimeException e) { + executionException.compareAndSet( + null, new SqlExecutionException("Error while retrieving result.", e)); + } + + // no result anymore + // either the job is done or an error occurred + isRunning = false; + } + } +} diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java index 28fc1eb81cf..57cb04dbc25 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java @@ -20,6 +20,7 @@ package org.apache.flink.table.gateway.service.session; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.operation.OperationManager; import java.io.Closeable; import java.util.Map; @@ -27,6 +28,9 @@ import java.util.Map; /** * Similar to HTTP Session, which could maintain user identity and store user-specific data during * multiple request/response interactions between a client and the gateway server. + * + * <p>TODO: make operation execution in sequence in + * https://issues.apache.org/jira/browse/FLINK-28053 */ public class Session implements Closeable { @@ -53,6 +57,10 @@ public class Session implements Closeable { return sessionContext.getConfigMap(); } + public OperationManager getOperationManager() { + return sessionContext.getOperationManager(); + } + @Override public void close() { sessionContext.close(); diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java index 7d1457bf3d9..77a3097783d 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java @@ -152,7 +152,8 @@ public class SessionManager { defaultContext, sessionId, environment.getSessionEndpointVersion(), - Configuration.fromMap(environment.getSessionConfig())); + Configuration.fromMap(environment.getSessionConfig()), + operationExecutorService); session = new Session(sessionContext); sessions.put(sessionId, session); @@ -211,4 +212,9 @@ public class SessionManager { int currentSessionCount() { return sessions.size(); } + + @VisibleForTesting + public int getOperationCount(SessionHandle sessionHandle) { + return getSession(sessionHandle).getOperationManager().getOperationCount(); + } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java index 048af7401c8..52d74fb882e 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java @@ -18,21 +18,54 @@ package org.apache.flink.table.gateway.service; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.operation.OperationStatus; +import org.apache.flink.table.gateway.api.operation.OperationType; +import org.apache.flink.table.gateway.api.results.OperationInfo; +import org.apache.flink.table.gateway.api.results.ResultSet; import org.apache.flink.table.gateway.api.session.SessionEnvironment; import org.apache.flink.table.gateway.api.session.SessionHandle; import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.operation.Operation; +import org.apache.flink.table.gateway.service.operation.OperationManager; +import org.apache.flink.table.gateway.service.session.SessionManager; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension; import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.function.RunnableWithException; +import org.assertj.core.api.Assertions; import org.hamcrest.Matchers; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; /** ITCase for {@link SqlGatewayServiceImpl}. */ public class SqlGatewayServiceITCase extends AbstractTestBase { @@ -41,10 +74,17 @@ public class SqlGatewayServiceITCase extends AbstractTestBase { public static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION = new SqlGatewayServiceExtension(); + private static SessionManager sessionManager; private static SqlGatewayServiceImpl service; + private final SessionEnvironment defaultSessionEnvironment = + SessionEnvironment.newBuilder() + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .build(); + @BeforeAll public static void setUp() { + sessionManager = SQL_GATEWAY_SERVICE_EXTENSION.getSessionManager(); service = (SqlGatewayServiceImpl) SQL_GATEWAY_SERVICE_EXTENSION.getService(); } @@ -70,4 +110,346 @@ public class SqlGatewayServiceITCase extends AbstractTestBase { actualConfig, Matchers.hasEntry(k, v))); } + + @Test + public void testFetchResultsInRunning() throws Exception { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + + CountDownLatch startRunningLatch = new CountDownLatch(1); + CountDownLatch endRunningLatch = new CountDownLatch(1); + OperationHandle operationHandle = + submitDefaultOperation( + sessionHandle, + () -> { + startRunningLatch.countDown(); + endRunningLatch.await(); + }); + + startRunningLatch.await(); + assertEquals( + ResultSet.NOT_READY_RESULTS, + service.fetchResults(sessionHandle, operationHandle, 0, Integer.MAX_VALUE)); + endRunningLatch.countDown(); + } + + @Test + public void testGetOperationFinishedAndFetchResults() throws Exception { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + + CountDownLatch startRunningLatch = new CountDownLatch(1); + CountDownLatch endRunningLatch = new CountDownLatch(1); + + OperationHandle operationHandle = + submitDefaultOperation( + sessionHandle, + () -> { + startRunningLatch.countDown(); + endRunningLatch.await(); + }); + + startRunningLatch.await(); + assertEquals( + new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN, true), + service.getOperationInfo(sessionHandle, operationHandle)); + + endRunningLatch.countDown(); + OperationInfo expectedInfo = + new OperationInfo(OperationStatus.FINISHED, OperationType.UNKNOWN, true); + + CommonTestUtils.waitUtil( + () -> service.getOperationInfo(sessionHandle, operationHandle).equals(expectedInfo), + Duration.ofSeconds(10), + "Failed to wait operation finish."); + + Long token = 0L; + List<RowData> expectedData = getDefaultResultSet().getData(); + List<RowData> actualData = new ArrayList<>(); + while (token != null) { + ResultSet currentResult = + service.fetchResults(sessionHandle, operationHandle, token, 1); + actualData.addAll(checkNotNull(currentResult.getData())); + token = currentResult.getNextToken(); + } + assertEquals(expectedData, actualData); + + service.closeOperation(sessionHandle, operationHandle); + assertEquals(0, sessionManager.getOperationCount(sessionHandle)); + } + + @Test + public void testCancelOperation() throws Exception { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + + CountDownLatch startRunningLatch = new CountDownLatch(1); + CountDownLatch endRunningLatch = new CountDownLatch(1); + + OperationHandle operationHandle = + submitDefaultOperation( + sessionHandle, + () -> { + startRunningLatch.countDown(); + endRunningLatch.await(); + }); + + startRunningLatch.await(); + assertEquals( + new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN, true), + service.getOperationInfo(sessionHandle, operationHandle)); + + service.cancelOperation(sessionHandle, operationHandle); + + assertEquals( + new OperationInfo(OperationStatus.CANCELED, OperationType.UNKNOWN, true), + service.getOperationInfo(sessionHandle, operationHandle)); + service.closeOperation(sessionHandle, operationHandle); + assertEquals(0, sessionManager.getOperationCount(sessionHandle)); + } + + @Test + public void testOperationGetErrorAndFetchError() throws Exception { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + + CountDownLatch startRunningLatch = new CountDownLatch(1); + + String msg = "Artificial Exception."; + OperationHandle operationHandle = + submitDefaultOperation( + sessionHandle, + () -> { + startRunningLatch.countDown(); + throw new SqlExecutionException(msg); + }); + startRunningLatch.await(); + + CommonTestUtils.waitUtil( + () -> + service.getOperationInfo(sessionHandle, operationHandle) + .getStatus() + .equals(OperationStatus.ERROR), + Duration.ofSeconds(10), + "Failed to get expected operation status."); + + Assertions.assertThatThrownBy( + () -> + service.fetchResults( + sessionHandle, operationHandle, 0, Integer.MAX_VALUE)) + .satisfies(anyCauseMatches(SqlExecutionException.class, msg)); + + service.closeOperation(sessionHandle, operationHandle); + assertEquals(0, sessionManager.getOperationCount(sessionHandle)); + } + + // -------------------------------------------------------------------------------------------- + // Concurrent tests + // -------------------------------------------------------------------------------------------- + + @Test + public void testCancelOperationAndFetchResultInParallel() throws Exception { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + CountDownLatch latch = new CountDownLatch(1); + // Make sure cancel the Operation before finish. + OperationHandle operationHandle = submitDefaultOperation(sessionHandle, latch::await); + runCancelOrCloseOperationWhenFetchResults( + sessionHandle, + operationHandle, + () -> service.cancelOperation(sessionHandle, operationHandle), + String.format( + "Can not fetch results from the %s in %s status.", + operationHandle, OperationStatus.CANCELED)); + latch.countDown(); + } + + @Test + public void testCloseOperationAndFetchResultInParallel() { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + OperationHandle operationHandle = + submitDefaultOperation( + sessionHandle, + () -> { + // allow close before execution finish. + Thread.sleep(1); + }); + runCancelOrCloseOperationWhenFetchResults( + sessionHandle, + operationHandle, + () -> service.closeOperation(sessionHandle, operationHandle), + String.format( + "Can not find the submitted operation in the OperationManager with the %s.", + operationHandle)); + } + + @Test + public void testCancelAndCloseOperationInParallel() throws Exception { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + int operationNum = 200; + List<Operation> operations = new ArrayList<>(operationNum); + for (int i = 0; i < operationNum; i++) { + boolean throwError = i % 2 == 0; + OperationHandle operationHandle = + submitDefaultOperation( + sessionHandle, + () -> { + // allow cancel/close before execution finish. + Thread.sleep(100); + if (throwError) { + throw new SqlGatewayException("Artificial Exception."); + } + }); + + operations.add( + service.getSession(sessionHandle) + .getOperationManager() + .getOperation(operationHandle)); + new Thread(() -> service.cancelOperation(sessionHandle, operationHandle)).start(); + new Thread(() -> service.closeOperation(sessionHandle, operationHandle)).start(); + } + + CommonTestUtils.waitUtil( + () -> + service.getSession(sessionHandle).getOperationManager().getOperationCount() + == 0, + Duration.ofSeconds(10), + "All operation should be closed."); + + for (Operation op : operations) { + assertEquals(OperationStatus.CLOSED, op.getOperationInfo().getStatus()); + } + } + + @Test + public void testSubmitOperationAndCloseOperationManagerInParallel() throws Exception { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + OperationManager manager = service.getSession(sessionHandle).getOperationManager(); + int submitThreadsNum = 100; + CountDownLatch latch = new CountDownLatch(submitThreadsNum); + for (int i = 0; i < submitThreadsNum; i++) { + new Thread( + () -> { + try { + submitDefaultOperation(sessionHandle, () -> {}); + } finally { + latch.countDown(); + } + }) + .start(); + } + manager.close(); + latch.await(); + assertEquals(0, manager.getOperationCount()); + } + + // -------------------------------------------------------------------------------------------- + // Negative tests + // -------------------------------------------------------------------------------------------- + + @Test + public void testFetchResultsFromCanceledOperation() throws Exception { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + + CountDownLatch latch = new CountDownLatch(1); + + OperationHandle operationHandle = submitDefaultOperation(sessionHandle, latch::await); + service.cancelOperation(sessionHandle, operationHandle); + assertThatThrownBy( + () -> + service.fetchResults( + sessionHandle, operationHandle, 0, Integer.MAX_VALUE)) + .satisfies( + anyCauseMatches( + String.format( + "Can not fetch results from the %s in %s status.", + operationHandle, OperationStatus.CANCELED))); + latch.countDown(); + } + + @Test + public void testRequestNonExistOperation() { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + + OperationHandle operationHandle = OperationHandle.create(); + List<RunnableWithException> requests = + Arrays.asList( + () -> service.cancelOperation(sessionHandle, operationHandle), + () -> service.getOperationInfo(sessionHandle, operationHandle), + () -> + service.fetchResults( + sessionHandle, operationHandle, 0, Integer.MAX_VALUE)); + + for (RunnableWithException request : requests) { + assertThatThrownBy(request::run) + .satisfies( + anyCauseMatches( + String.format( + "Can not find the submitted operation in the OperationManager with the %s.", + operationHandle))); + } + } + + // -------------------------------------------------------------------------------------------- + + private OperationHandle submitDefaultOperation( + SessionHandle sessionHandle, RunnableWithException executor) { + return service.submitOperation( + sessionHandle, + OperationType.UNKNOWN, + () -> { + executor.run(); + return getDefaultResultSet(); + }); + } + + private ResultSet getDefaultResultSet() { + List<RowData> data = + Arrays.asList( + GenericRowData.ofKind(INSERT, 1L, StringData.fromString("Flink CDC"), 3), + GenericRowData.ofKind(INSERT, 2L, StringData.fromString("MySql"), null), + GenericRowData.ofKind(DELETE, 1, null, null), + GenericRowData.ofKind(UPDATE_AFTER, 2, null, 101)); + return new ResultSet( + ResultSet.ResultType.PAYLOAD, + null, + ResolvedSchema.of( + Column.physical("id", DataTypes.BIGINT()), + Column.physical("name", DataTypes.STRING()), + Column.physical("age", DataTypes.INT())), + data); + } + + private void runCancelOrCloseOperationWhenFetchResults( + SessionHandle sessionHandle, + OperationHandle operationHandle, + RunnableWithException cancelOrClose, + String errorMsg) { + + List<RowData> actual = new ArrayList<>(); + new Thread( + () -> { + try { + cancelOrClose.run(); + } catch (Exception e) { + // ignore + } + }) + .start(); + + assertThatThrownBy( + () -> { + Long token = 0L; + while (token != null) { + ResultSet resultSet = + service.fetchResults( + sessionHandle, + operationHandle, + token, + Integer.MAX_VALUE); + token = resultSet.getNextToken(); + if (resultSet.getResultType() == ResultSet.ResultType.PAYLOAD) { + actual.addAll(resultSet.getData()); + } + } + }) + .satisfies(anyCauseMatches(errorMsg)); + + assertTrue(new HashSet<>(getDefaultResultSet().getData()).containsAll(actual)); + } } diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java new file mode 100644 index 00000000000..fc228241613 --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/result/ResultFetcherTest.java @@ -0,0 +1,364 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.result; + +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.core.testutils.FlinkAssertions; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.api.operation.OperationHandle; +import org.apache.flink.table.gateway.api.results.ResultSet; +import org.apache.flink.table.gateway.api.utils.SqlGatewayException; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; + +import org.apache.commons.collections.iterators.IteratorChain; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** Test for {@link ResultFetcher}. */ +public class ResultFetcherTest { + + private static ResolvedSchema schema; + private static List<RowData> data; + + @BeforeAll + public static void setUp() { + schema = + ResolvedSchema.of( + Column.physical("boolean", DataTypes.BOOLEAN()), + Column.physical("int", DataTypes.INT()), + Column.physical("bigint", DataTypes.BIGINT()), + Column.physical("varchar", DataTypes.STRING()), + Column.physical("decimal(10, 5)", DataTypes.DECIMAL(10, 5)), + Column.physical( + "timestamp", DataTypes.TIMESTAMP(6).bridgedTo(Timestamp.class)), + Column.physical("binary", DataTypes.BYTES())); + data = + Arrays.asList( + GenericRowData.ofKind( + RowKind.INSERT, + null, + 1, + 2L, + "abc", + BigDecimal.valueOf(1.23), + Timestamp.valueOf("2020-03-01 18:39:14"), + new byte[] {50, 51, 52, -123, 54, 93, 115, 126}), + GenericRowData.ofKind( + RowKind.UPDATE_BEFORE, + false, + null, + 0L, + "", + BigDecimal.valueOf(1), + Timestamp.valueOf("2020-03-01 18:39:14.1"), + new byte[] {100, -98, 32, 121, -125}), + GenericRowData.ofKind( + RowKind.UPDATE_AFTER, + true, + Integer.MAX_VALUE, + null, + "abcdefg", + BigDecimal.valueOf(12345), + Timestamp.valueOf("2020-03-01 18:39:14.12"), + new byte[] {-110, -23, 1, 2}), + GenericRowData.ofKind( + RowKind.DELETE, + false, + Integer.MIN_VALUE, + Long.MAX_VALUE, + null, + BigDecimal.valueOf(12345.06789), + Timestamp.valueOf("2020-03-01 18:39:14.123"), + new byte[] {50, 51, 52, -123, 54, 93, 115, 126}), + GenericRowData.ofKind( + RowKind.INSERT, + true, + 100, + Long.MIN_VALUE, + "abcdefg111", + null, + Timestamp.valueOf("2020-03-01 18:39:14.123456"), + new byte[] {110, 23, -1, -2}), + GenericRowData.ofKind( + RowKind.DELETE, + null, + -1, + -1L, + "abcdefghijklmnopqrstuvwxyz", + BigDecimal.valueOf(-12345.06789), + null, + null), + GenericRowData.ofKind( + RowKind.INSERT, + null, + -1, + -1L, + "这是一段中文", + BigDecimal.valueOf(-12345.06789), + Timestamp.valueOf("2020-03-04 18:39:14"), + new byte[] {-3, -2, -1, 0, 1, 2, 3}), + GenericRowData.ofKind( + RowKind.DELETE, + null, + -1, + -1L, + "これは日本語をテストするための文です", + BigDecimal.valueOf(-12345.06789), + Timestamp.valueOf("2020-03-04 18:39:14"), + new byte[] {-3, -2, -1, 0, 1, 2, 3})); + } + + @Test + public void testFetchResultsMultipleTimesWithLimitedBufferSize() { + int bufferSize = data.size() / 2; + ResultFetcher fetcher = + buildResultFetcher(Collections.singletonList(data.iterator()), bufferSize); + + runFetchMultipleTimes(fetcher, bufferSize, data.size()); + } + + @Test + public void testFetchResultsMultipleTimesWithLimitedFetchSize() { + int bufferSize = data.size(); + ResultFetcher fetcher = + buildResultFetcher(Collections.singletonList(data.iterator()), bufferSize); + + runFetchMultipleTimes(fetcher, bufferSize, data.size() / 2); + } + + @Test + public void testFetchResultInParallel() throws Exception { + int bufferSize = data.size() / 2; + ResultFetcher fetcher = + buildResultFetcher(Collections.singletonList(data.iterator()), bufferSize); + + AtomicReference<Boolean> isEqual = new AtomicReference<>(true); + int fetchThreadNum = 100; + CountDownLatch latch = new CountDownLatch(fetchThreadNum); + + CommonTestUtils.waitUtil( + () -> fetcher.getResultStore().getBufferedRecordSize() > 0, + Duration.ofSeconds(10), + "Failed to wait the buffer has data."); + List<RowData> firstFetch = fetcher.fetchResults(0, Integer.MAX_VALUE).getData(); + for (int i = 0; i < fetchThreadNum; i++) { + new Thread( + () -> { + ResultSet resultSet = fetcher.fetchResults(0, Integer.MAX_VALUE); + + if (!firstFetch.equals(resultSet.getData())) { + isEqual.set(false); + } + latch.countDown(); + }) + .start(); + } + + latch.await(); + assertEquals(true, isEqual.get()); + } + + @Test + public void testFetchResultAfterClose() throws Exception { + ResultFetcher fetcher = + buildResultFetcher(Collections.singletonList(data.iterator()), data.size() + 1); + List<RowData> actual = Collections.emptyList(); + long token = 0L; + + while (actual.size() < 1) { + // fill the fetcher buffer + ResultSet resultSet = fetcher.fetchResults(token, 1); + token = checkNotNull(resultSet.getNextToken()); + actual = resultSet.getData(); + } + assertEquals(data.subList(0, 1), actual); + fetcher.close(); + + long testToken = token; + AtomicReference<Boolean> meetEnd = new AtomicReference<>(false); + new Thread( + () -> { + // Should meet EOS in the end. + long nextToken = testToken; + while (true) { + ResultSet resultSet = + fetcher.fetchResults(nextToken, Integer.MAX_VALUE); + if (resultSet.getResultType() == ResultSet.ResultType.EOS) { + break; + } + nextToken = checkNotNull(resultSet.getNextToken()); + } + meetEnd.set(true); + }) + .start(); + + CommonTestUtils.waitUtil( + meetEnd::get, + Duration.ofSeconds(10), + "Should get EOS when fetch results from the closed fetcher."); + } + + @Test + public void testFetchResultWithToken() { + ResultFetcher fetcher = + buildResultFetcher(Collections.singletonList(data.iterator()), data.size()); + Long nextToken = 0L; + List<RowData> actual = new ArrayList<>(); + ResultSet resultSetBefore = null; + while (nextToken != null) { + if (resultSetBefore != null) { + assertEquals(resultSetBefore, fetcher.fetchResults(nextToken - 1, data.size())); + } + + ResultSet resultSet = fetcher.fetchResults(nextToken, data.size()); + ResultSet resultSetWithSameToken = fetcher.fetchResults(nextToken, data.size()); + + assertEquals(resultSet, resultSetWithSameToken); + if (resultSet.getResultType() == ResultSet.ResultType.EOS) { + break; + } + resultSetBefore = resultSet; + + actual.addAll(checkNotNull(resultSet.getData())); + nextToken = resultSet.getNextToken(); + } + + assertEquals(data, actual); + } + + @Test + public void testFetchFailedResult() { + String message = "Artificial Exception"; + ResultFetcher fetcher = + buildResultFetcher( + Arrays.asList(new ErrorIterator(message), data.iterator()), data.size()); + + assertThatThrownBy( + () -> { + Long token = 0L; + while (token != null) { + // Use loop to fetch results from the ErrorIterator + token = + fetcher.fetchResults(token, Integer.MAX_VALUE) + .getNextToken(); + } + }) + .satisfies(FlinkAssertions.anyCauseMatches(message)); + } + + @Test + public void testFetchIllegalToken() { + ResultFetcher fetcher = + buildResultFetcher(Collections.singletonList(data.iterator()), data.size()); + assertThatThrownBy(() -> fetcher.fetchResults(2, Integer.MAX_VALUE)) + .satisfies(FlinkAssertions.anyCauseMatches("Expecting token to be 0, but found 2")); + } + + @Test + public void testFetchBeforeWithDifferentSize() throws Exception { + ResultFetcher fetcher = + buildResultFetcher(Collections.singletonList(data.iterator()), data.size() / 2); + CommonTestUtils.waitUtil( + () -> fetcher.getResultStore().getBufferedRecordSize() > 1, + Duration.ofSeconds(10), + "Failed to make cached records num larger than 1."); + + ResultSet firstFetch = fetcher.fetchResults(0, Integer.MAX_VALUE); + int firstFetchSize = firstFetch.getData().size(); + assertThatThrownBy(() -> fetcher.fetchResults(0, firstFetchSize - 1)) + .satisfies( + FlinkAssertions.anyCauseMatches( + String.format( + "As the same token is provided, fetch size must be not less than the previous returned buffer size." + + " Previous returned result size is %s, current max_fetch_size to be %s.", + firstFetch.getData().size(), firstFetchSize - 1))); + } + + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("unchecked") + private ResultFetcher buildResultFetcher(List<Iterator<RowData>> rows, int bufferSize) { + OperationHandle operationHandle = OperationHandle.create(); + return new ResultFetcher( + operationHandle, + schema, + CloseableIterator.adapterForIterator(new IteratorChain(rows)), + bufferSize); + } + + private void runFetchMultipleTimes(ResultFetcher fetcher, int bufferSize, int fetchSize) { + List<RowData> fetchedRows = new ArrayList<>(); + ResultSet currentResult = null; + Long token = 0L; + + while (token != null) { + currentResult = fetcher.fetchResults(token, fetchSize); + assertTrue( + checkNotNull(currentResult.getData()).size() + <= Math.min(bufferSize, fetchSize)); + token = currentResult.getNextToken(); + fetchedRows.addAll(currentResult.getData()); + } + + assertEquals(ResultSet.ResultType.EOS, checkNotNull(currentResult).getResultType()); + assertEquals(data, fetchedRows); + } + + // -------------------------------------------------------------------------------------------- + + private static class ErrorIterator implements Iterator<RowData> { + + private final String errorMsg; + + public ErrorIterator(String errorMsg) { + this.errorMsg = errorMsg; + } + + @Override + public boolean hasNext() { + return true; + } + + @Override + public RowData next() { + throw new SqlGatewayException(errorMsg); + } + } +} diff --git a/flink-table/flink-sql-gateway/src/test/resources/log4j2-test.properties b/flink-table/flink-sql-gateway/src/test/resources/log4j2-test.properties new file mode 100644 index 00000000000..835c2ec9a3d --- /dev/null +++ b/flink-table/flink-sql-gateway/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level = OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n