wuchong commented on code in PR #19823: URL: https://github.com/apache/flink/pull/19823#discussion_r895303674
########## flink-table/flink-sql-gateway/pom.xml: ########## @@ -0,0 +1,103 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>flink-table</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.16-SNAPSHOT</version> + </parent> + + <artifactId>flink-sql-gateway</artifactId> + <name>Flink : Table : SQL Gateway</name> + + <dependencies> + <!-- Flink client to submit jobs --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Table ecosystem --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-gateway-common</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-sql-gateway-common</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Build flink-sql-gateway jar --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <!-- Exclude all flink-dist files and only contain sqlserver files --> Review Comment: ```suggestion <!-- Exclude all flink-dist files and only contain sql-gateway files --> ``` ########## flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/config/SqlGatewayServiceConfigOptions.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.common.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.gateway.common.SqlGatewayService; + +import java.time.Duration; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** Config options of the {@link SqlGatewayService}. */ +@PublicEvolving +public class SqlGatewayServiceConfigOptions { + + public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT = + key("sql-gateway.session.idle-timeout") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription( + "Timeout interval for closing the session when the session hasn't been accessed during the interval. " + + "If setting to zero or negative value, the session will not be closed."); + + public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_CHECK_INTERVAL = + key("sql-gateway.session.check-interval") + .durationType() + .defaultValue(Duration.ofMinutes(1)) + .withDescription( + "The check interval for idle session timeout, which can be disabled by setting to zero or negative value."); + + public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM = + key("sql-gateway.session.max-num") + .intType() + .defaultValue(1000000) + .withDescription( + "The check interval for session timeout, which can be disabled by setting to zero or negative value."); + + public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX = + key("sql-gateway.worker.threads.max") + .intType() + .defaultValue(500) + .withDescription("Maximum number of worker threads for gateway GRPC server."); + + public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MIN = + key("sql-gateway.worker.threads.min") + .intType() + .defaultValue(5) + .withDescription("Minimum number of worker threads for gateway GRPC server."); Review Comment: Same to above. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java: ########## @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.context; + +import org.apache.flink.client.ClientUtils; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl; +import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.FunctionCatalog; +import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.delegation.Executor; +import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.delegation.Planner; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.PlannerFactoryUtil; +import org.apache.flink.table.gateway.common.endpoint.EndpointVersion; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.service.operation.OperationManager; +import org.apache.flink.table.module.ModuleManager; +import org.apache.flink.util.TemporaryClassLoaderContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutorService; + +/** + * Context describing a session, it's mainly used for user to open a new session in the backend. If + * client request to open a new session, the backend {@code Executor} will maintain the session + * context map util users close it. + */ +public class SessionContext { + + private static final Logger LOG = LoggerFactory.getLogger(SessionContext.class); + + private final SessionHandle sessionId; + private final EndpointVersion endpointVersion; + + // store all options and use Configuration to build SessionState and TableConfig. + private final Configuration sessionConf; + private final SessionState sessionState; + private final URLClassLoader userClassloader; + + private final OperationManager operationManager; + + private SessionContext( + SessionHandle sessionId, + EndpointVersion endpointVersion, + Configuration sessionConf, + URLClassLoader classLoader, + SessionState sessionState, + OperationManager operationManager) { + this.sessionId = sessionId; + this.endpointVersion = endpointVersion; + this.sessionConf = sessionConf; + this.userClassloader = classLoader; + this.sessionState = sessionState; + this.operationManager = operationManager; + } + + // -------------------------------------------------------------------------------------------- + // Getter method + // -------------------------------------------------------------------------------------------- + + public SessionHandle getSessionId() { + return this.sessionId; + } + + public Map<String, String> getConfigMap() { + return sessionConf.toMap(); + } + + // -------------------------------------------------------------------------------------------- + // Method to execute commands + // -------------------------------------------------------------------------------------------- + + /** Close resources, e.g. catalogs. */ + public void close() { + try (TemporaryClassLoaderContext ignored = + TemporaryClassLoaderContext.of(userClassloader)) { Review Comment: Currently, FLINK-15635 has been resolved. Is it possible to hand the `userClassloader` over to TableEnv for management in this PR? ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/Operation.java: ########## @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.operation; + +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.operation.OperationStatus; +import org.apache.flink.table.gateway.common.operation.OperationType; +import org.apache.flink.table.gateway.common.results.OperationInfo; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.result.ExecutionResult; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +/** Entity that contains the resource to run and the execution results. */ +public class Operation { + + private static final Logger LOG = LoggerFactory.getLogger(Operation.class); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + private final OperationHandle operationHandle; + + private final OperationType operationType; + private final boolean hasResults; + private OperationStatus status; + + private final Callable<ExecutionResult> resultSupplier; + + private Future<?> invocation; + private ExecutionResult operationResult; + private SqlExecutionException operationError; + + public Operation( + OperationHandle operationHandle, + OperationType operationType, + Callable<ExecutionResult> resultSupplier) { + this.operationHandle = operationHandle; + this.status = OperationStatus.INITIALIZED; + this.operationType = operationType; + this.hasResults = true; + this.resultSupplier = resultSupplier; + } + + void runBefore() { + updateState(OperationStatus.PENDING); + } + + void runAfter() { + updateState(OperationStatus.FINISHED); + } + + public void run(ExecutorService service) { + invocation = + service.submit( + () -> { + try { + runBefore(); + updateState(OperationStatus.RUNNING); + ExecutionResult result = resultSupplier.call(); + writeLock(() -> operationResult = result); + runAfter(); + } catch (Exception e) { + String msg = + String.format( + "Failed to execute the operation %s.", + operationHandle); + LOG.error(msg, e); + writeLock( + () -> { + updateState(OperationStatus.ERROR); + operationError = new SqlExecutionException(msg, e); + }); + } + }); + } + + public void cancel() { + writeLock( + () -> { + if (invocation != null && !invocation.isDone()) { + invocation.cancel(true); + } + + if (operationResult != null) { + operationResult.close(); + operationResult = null; + } + + updateState(OperationStatus.CANCELED); + }); + } + + public void close() { + writeLock( + () -> { + if (invocation != null && !invocation.isDone()) { + invocation.cancel(true); + } + + if (operationResult != null) { + operationResult.close(); + operationResult = null; + } + + updateState(OperationStatus.CLOSED); + }); + } + + public ResultSet fetchResults(long token, int maxRows) { + OperationStatus currentStatus = readLock(() -> status); + + if (currentStatus == OperationStatus.ERROR) { + throw operationError; + } else if (currentStatus == OperationStatus.FINISHED) { + return operationResult.fetchResults(token, maxRows); + } else { + throw new SqlGatewayException( + String.format( + "Can not fetch results from the %s in %s status.", + operationHandle, currentStatus)); + } + } + + public OperationInfo getOperationInfo() { + return readLock(() -> new OperationInfo(status, operationType, hasResults)); + } + + private void updateState(OperationStatus toStatus) { + writeLock( + () -> { + boolean isValid = OperationStatus.isValidStatusTranslation(status, toStatus); + if (!isValid) { + String message = + String.format( + "Failed to convert the Operation Status from %s to %s for %s.", + status, toStatus, operationHandle); + LOG.error(message); + throw new SqlGatewayException(message); + } + LOG.info( + String.format( + "Convert operation %s from %s to %s.", Review Comment: The logs might be verbose using the current `OperationHandle#toString`. Can we simplify the operation handle string in logs? ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.session; + +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.operation.OperationManager; + +import java.io.Closeable; +import java.util.Map; + +/** Session that manages the registered resource including jars, registered table. */ +public class Session implements Closeable { + + private final SessionContext sessionContext; + private long lastAccessTime; Review Comment: How can we guarantee the thread-safe of Session? For example, there might be concurrency requests for the same session, they may change session configuration at the same time. The other case is that `lastAccessTime` can be accessed by expire thread and worker thread at the same time. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.session; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.gateway.common.session.SessionEnvironment; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.DefaultContext; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.utils.Constants; +import org.apache.flink.table.gateway.service.utils.ThreadUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN; + +/** Manage the lifecycle of the {@code Session}. */ +public class SessionManager { + + private static final Logger LOG = LoggerFactory.getLogger(SessionManager.class); + + private final DefaultContext defaultContext; + + private final long idleTimeout; + private final long checkInterval; + private final int maxSessionCount; + + private final Map<SessionHandle, Session> sessions; + + private ExecutorService operationExecutorService; + private ScheduledExecutorService scheduledExecutorService; + private ScheduledFuture<?> timeoutCheckerFuture; + + public SessionManager(DefaultContext defaultContext) { + this.defaultContext = defaultContext; + ReadableConfig conf = defaultContext.getFlinkConfig(); + this.idleTimeout = conf.get(SQL_GATEWAY_SESSION_IDLE_TIMEOUT).toMillis(); + this.checkInterval = conf.get(SQL_GATEWAY_SESSION_CHECK_INTERVAL).toMillis(); + this.maxSessionCount = conf.get(SQL_GATEWAY_SESSION_MAX_NUM); + this.sessions = new ConcurrentHashMap<>(); + } + + public void start() { + if (checkInterval > 0 && idleTimeout > 0) { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + timeoutCheckerFuture = + scheduledExecutorService.scheduleAtFixedRate( + () -> { + LOG.debug( + "Start to cleanup expired sessions, current session count: {}", + sessions.size()); + for (Map.Entry<SessionHandle, Session> entry : + sessions.entrySet()) { + SessionHandle sessionId = entry.getKey(); + Session session = entry.getValue(); + if (isSessionExpired(session)) { + LOG.info("Session {} is expired, close it...", sessionId); Review Comment: ```suggestion LOG.info("Session {} is expired, closing it...", sessionId); ``` ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service; + +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.gateway.common.SqlGatewayService; +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.operation.OperationStatus; +import org.apache.flink.table.gateway.common.operation.OperationType; +import org.apache.flink.table.gateway.common.results.OperationInfo; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.common.session.SessionEnvironment; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.common.utils.MockedEndpointVersion; +import org.apache.flink.table.gateway.service.session.SessionManager; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestResource; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.function.RunnableWithException; + +import org.assertj.core.api.Assertions; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** ITCase for {@link SqlGatewayServiceImpl}. */ +public class SqlGatewayServiceITCase extends AbstractTestBase { + + @ClassRule + public static final SqlGatewayServiceTestResource SQL_GATEWAY_SERVICE_RESOURCE = + new SqlGatewayServiceTestResource(); + + private static SessionManager sessionManager; + private static SqlGatewayService service; + + private final SessionEnvironment defaultSessionEnvironment = + SessionEnvironment.newBuilder() + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .build(); + + @BeforeClass + public static void setup() { + sessionManager = SQL_GATEWAY_SERVICE_RESOURCE.getSessionManager(); + service = SQL_GATEWAY_SERVICE_RESOURCE.getService(); + } + + @Test + public void testOpenSessionWithConfig() { + Map<String, String> options = new HashMap<>(); + options.put("key1", "val1"); + options.put("key2", "val2"); + SessionEnvironment environment = + SessionEnvironment.newBuilder() + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .addSessionConfig(options) + .build(); + + SessionHandle sessionHandle = service.openSession(environment); + Map<String, String> actualConfig = service.getSessionConfig(sessionHandle); + + for (String key : options.keySet()) { + assertEquals(options.get(key), actualConfig.get(key)); + } + } + + @Test + public void testGetOperationFinishedAndFetchResults() throws Exception { + SessionHandle sessionHandle = service.openSession(defaultSessionEnvironment); + + CountDownLatch startRunningLatch = new CountDownLatch(1); + CountDownLatch endRunningLatch = new CountDownLatch(1); + + OperationHandle operationHandle = + submitDefaultOperation( + sessionHandle, + () -> { + startRunningLatch.countDown(); + endRunningLatch.await(); + }); + + startRunningLatch.await(); + assertEquals( + new OperationInfo(OperationStatus.RUNNING, OperationType.UNKNOWN, true), + service.getOperationInfo(sessionHandle, operationHandle)); + + endRunningLatch.countDown(); + Thread.sleep(1000); Review Comment: Maybe we should use iterations here. Sleep is still not safe when CPU load is high. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service; + +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.gateway.common.SqlGatewayService; +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.operation.OperationStatus; +import org.apache.flink.table.gateway.common.operation.OperationType; +import org.apache.flink.table.gateway.common.results.OperationInfo; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.common.session.SessionEnvironment; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.common.utils.MockedEndpointVersion; +import org.apache.flink.table.gateway.service.session.SessionManager; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestResource; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.function.RunnableWithException; + +import org.assertj.core.api.Assertions; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** ITCase for {@link SqlGatewayServiceImpl}. */ +public class SqlGatewayServiceITCase extends AbstractTestBase { + + @ClassRule + public static final SqlGatewayServiceTestResource SQL_GATEWAY_SERVICE_RESOURCE = + new SqlGatewayServiceTestResource(); + + private static SessionManager sessionManager; + private static SqlGatewayService service; + + private final SessionEnvironment defaultSessionEnvironment = + SessionEnvironment.newBuilder() + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .build(); + + @BeforeClass + public static void setup() { + sessionManager = SQL_GATEWAY_SERVICE_RESOURCE.getSessionManager(); + service = SQL_GATEWAY_SERVICE_RESOURCE.getService(); + } + + @Test + public void testOpenSessionWithConfig() { + Map<String, String> options = new HashMap<>(); + options.put("key1", "val1"); + options.put("key2", "val2"); + SessionEnvironment environment = + SessionEnvironment.newBuilder() + .setSessionEndpointVersion(MockedEndpointVersion.V1) + .addSessionConfig(options) + .build(); + + SessionHandle sessionHandle = service.openSession(environment); + Map<String, String> actualConfig = service.getSessionConfig(sessionHandle); + + for (String key : options.keySet()) { + assertEquals(options.get(key), actualConfig.get(key)); + } Review Comment: Can't use `assertEquals` on the Map? ########## flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/config/SqlGatewayServiceConfigOptions.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.common.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.gateway.common.SqlGatewayService; + +import java.time.Duration; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** Config options of the {@link SqlGatewayService}. */ +@PublicEvolving +public class SqlGatewayServiceConfigOptions { + + public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT = + key("sql-gateway.session.idle-timeout") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription( + "Timeout interval for closing the session when the session hasn't been accessed during the interval. " + + "If setting to zero or negative value, the session will not be closed."); + + public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_CHECK_INTERVAL = + key("sql-gateway.session.check-interval") + .durationType() + .defaultValue(Duration.ofMinutes(1)) + .withDescription( + "The check interval for idle session timeout, which can be disabled by setting to zero or negative value."); + + public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM = + key("sql-gateway.session.max-num") + .intType() + .defaultValue(1000000) + .withDescription( + "The check interval for session timeout, which can be disabled by setting to zero or negative value."); + + public static final ConfigOption<Integer> SQL_GATEWAY_WORKER_THREADS_MAX = + key("sql-gateway.worker.threads.max") + .intType() + .defaultValue(500) + .withDescription("Maximum number of worker threads for gateway GRPC server."); Review Comment: We shouldn't assume gateway uses gRPC underlying. ########## flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/results/ResultSet.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.common.results; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** The collection of the results. */ +@PublicEvolving +public class ResultSet { + + private final ResultType resultType; + + private final Long nextToken; + + private final ResolvedSchema resultSchema; + private final List<RowData> data; + + public ResultSet( + ResultType resultType, + Long nextToken, + ResolvedSchema resultSchema, + List<RowData> data) { + this.nextToken = nextToken; + this.resultType = resultType; + this.resultSchema = resultSchema; + this.data = data; + } + + public ResultType getResultType() { Review Comment: Please add Javadocs for all the public API methods. ########## flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/config/SqlGatewayServiceConfigOptions.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.common.config; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.gateway.common.SqlGatewayService; + +import java.time.Duration; + +import static org.apache.flink.configuration.ConfigOptions.key; + +/** Config options of the {@link SqlGatewayService}. */ +@PublicEvolving +public class SqlGatewayServiceConfigOptions { + + public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_IDLE_TIMEOUT = + key("sql-gateway.session.idle-timeout") + .durationType() + .defaultValue(Duration.ofMinutes(10)) + .withDescription( + "Timeout interval for closing the session when the session hasn't been accessed during the interval. " + + "If setting to zero or negative value, the session will not be closed."); + + public static final ConfigOption<Duration> SQL_GATEWAY_SESSION_CHECK_INTERVAL = + key("sql-gateway.session.check-interval") + .durationType() + .defaultValue(Duration.ofMinutes(1)) + .withDescription( + "The check interval for idle session timeout, which can be disabled by setting to zero or negative value."); + + public static final ConfigOption<Integer> SQL_GATEWAY_SESSION_MAX_NUM = + key("sql-gateway.session.max-num") + .intType() + .defaultValue(1000000) + .withDescription( + "The check interval for session timeout, which can be disabled by setting to zero or negative value."); Review Comment: The description is not correct for session number. ########## flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/results/ResultSet.java: ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.common.results; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** The collection of the results. */ +@PublicEvolving +public class ResultSet { + + private final ResultType resultType; + + private final Long nextToken; + + private final ResolvedSchema resultSchema; + private final List<RowData> data; + + public ResultSet( + ResultType resultType, + Long nextToken, + ResolvedSchema resultSchema, + List<RowData> data) { + this.nextToken = nextToken; + this.resultType = resultType; + this.resultSchema = resultSchema; + this.data = data; + } + + public ResultType getResultType() { + return resultType; + } + + public Long getNextToken() { Review Comment: Is the return value nullable? If yes, please add `@Nullable` annotation and explain the meaning of null value. ########## flink-table/flink-sql-gateway-common/src/main/java/org/apache/flink/table/gateway/common/session/SessionEnvironment.java: ########## @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.common.session; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.gateway.common.endpoint.EndpointVersion; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** Environment to initialize the {@code Session}. */ +@PublicEvolving +public class SessionEnvironment { + private final @Nullable String sessionName; + private final EndpointVersion version; + private final List<String> libs; + private final List<String> jars; + private final Map<String, String> sessionConfig; + + @VisibleForTesting + SessionEnvironment( + @Nullable String sessionName, + EndpointVersion version, + List<String> libs, + List<String> jars, Review Comment: Is there any sanity check for the lib and jars? ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.session; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.gateway.common.session.SessionEnvironment; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.DefaultContext; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.utils.Constants; +import org.apache.flink.table.gateway.service.utils.ThreadUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN; + +/** Manage the lifecycle of the {@code Session}. */ +public class SessionManager { + + private static final Logger LOG = LoggerFactory.getLogger(SessionManager.class); + + private final DefaultContext defaultContext; + + private final long idleTimeout; + private final long checkInterval; + private final int maxSessionCount; + + private final Map<SessionHandle, Session> sessions; + + private ExecutorService operationExecutorService; + private ScheduledExecutorService scheduledExecutorService; + private ScheduledFuture<?> timeoutCheckerFuture; + + public SessionManager(DefaultContext defaultContext) { + this.defaultContext = defaultContext; + ReadableConfig conf = defaultContext.getFlinkConfig(); + this.idleTimeout = conf.get(SQL_GATEWAY_SESSION_IDLE_TIMEOUT).toMillis(); + this.checkInterval = conf.get(SQL_GATEWAY_SESSION_CHECK_INTERVAL).toMillis(); + this.maxSessionCount = conf.get(SQL_GATEWAY_SESSION_MAX_NUM); + this.sessions = new ConcurrentHashMap<>(); + } + + public void start() { + if (checkInterval > 0 && idleTimeout > 0) { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + timeoutCheckerFuture = + scheduledExecutorService.scheduleAtFixedRate( + () -> { + LOG.debug( + "Start to cleanup expired sessions, current session count: {}", + sessions.size()); + for (Map.Entry<SessionHandle, Session> entry : + sessions.entrySet()) { + SessionHandle sessionId = entry.getKey(); + Session session = entry.getValue(); + if (isSessionExpired(session)) { + LOG.info("Session {} is expired, close it...", sessionId); + closeSession(session); + } + } + LOG.debug( + "Removing expired session finished, current session count: {}", + sessions.size()); + }, + checkInterval, + checkInterval, + TimeUnit.MILLISECONDS); + } + + ReadableConfig conf = defaultContext.getFlinkConfig(); + operationExecutorService = + ThreadUtils.newDaemonQueuedThreadPool( + conf.get(SQL_GATEWAY_WORKER_THREADS_MIN), + conf.get(SQL_GATEWAY_WORKER_THREADS_MAX), + conf.get(SQL_GATEWAY_WORKER_KEEPALIVE_TIME).toMillis(), + Constants.OPERATION_POOL_NAME); + } + + public void stop() { + if (scheduledExecutorService != null) { + timeoutCheckerFuture.cancel(true); + scheduledExecutorService.shutdown(); + } + if (operationExecutorService != null) { + operationExecutorService.shutdown(); + } + LOG.info("SessionManager is stopped."); + } + + public Session getSession(SessionHandle sessionHandle) throws SqlGatewayException { + Session session = sessions.get(sessionHandle); + if (session == null) { + String msg = String.format("Session '%s' does not exist.", sessionHandle); + LOG.warn(msg); + throw new SqlGatewayException(msg); + } + session.touch(); + return session; + } + + /** + * Register the session into the {@link SessionManager}. + * + * <p>Use synchronized to keep the checkSessionCount and build the Session are atomic. + */ + public synchronized Session openSession(SessionEnvironment environment) + throws SqlGatewayException { + // check session limit + checkSessionCount(); + + Session session = null; + SessionHandle sessionId = null; + do { + sessionId = SessionHandle.create(); + } while (sessions.containsKey(sessionId)); + + SessionContext sessionContext = + SessionContext.create( + defaultContext, + sessionId, + environment.getSessionEndpointVersion(), + Configuration.fromMap(environment.getSessionConfig()), + operationExecutorService); + session = new Session(sessionContext); + sessions.put(sessionId, session); + + LOG.info( + "Session {} is opened, current sessions: {}.", + session.getSessionHandle(), + sessions.size()); + + return session; + } + + public void closeSession(SessionHandle sessionId) throws SqlGatewayException { + Session session = getSession(sessionId); + closeSession(session); + } + + // ------------------------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------------------------ + + private void checkSessionCount() throws SqlGatewayException { + if (maxSessionCount <= 0) { + return; + } + if (sessions.size() >= maxSessionCount) { + String msg = + String.format( + "Failed to create session, the count of active sessions exceeds the max count: %s", + maxSessionCount); + LOG.warn(msg); + throw new SqlGatewayException(msg); + } + } + + private boolean isSessionExpired(Session session) { + if (idleTimeout > 0) { + return (System.currentTimeMillis() - session.getLastAccessTime()) > idleTimeout; + } else { + return false; + } + } + + private void closeSession(Session session) { + SessionHandle sessionId = session.getSessionHandle(); + sessions.remove(sessionId); + session.close(); + LOG.info("Session: {} is closed.", sessionId); Review Comment: Simplify the session id toString. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.operation; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.operation.OperationType; +import org.apache.flink.table.gateway.common.results.OperationInfo; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.result.ExecutionResult; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.util.CloseableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +/** Manager for the {@link Operation}. */ +public class OperationManager { + + private static final Logger LOG = LoggerFactory.getLogger(OperationManager.class); + + private final Map<OperationHandle, Operation> submittedOperations; + private final ExecutorService service; + + public OperationManager(ExecutorService service) { + this.service = service; + submittedOperations = new ConcurrentHashMap<>(); + } + + public OperationHandle submitOperation( + OperationType operationType, Callable<ResultSet> executor) { + OperationHandle handle = OperationHandle.create(); + Operation operation = + new Operation( + handle, + operationType, + () -> { + ResultSet resultSet = executor.call(); + List<RowData> rows = resultSet.getData(); + return new ExecutionResult( + new ResultFetcher( + handle, + resultSet.getResultSchema(), + CloseableIterator.adapterForIterator(rows.iterator()), + rows.size())); + }); + submittedOperations.put(handle, operation); + operation.run(service); + return handle; + } + + public void cancelOperation(OperationHandle operationHandle) { + getOperation(operationHandle).cancel(); + } + + public void closeOperation(OperationHandle operationHandle) { + Operation opToRemove = submittedOperations.remove(operationHandle); + if (opToRemove != null) { + opToRemove.close(); + } + } + + public OperationInfo getOperationInfo(OperationHandle operationHandle) { + return getOperation(operationHandle).getOperationInfo(); + } + + public ResultSet fetchResults(OperationHandle operationHandle, long token, int maxRows) { + return getOperation(operationHandle).fetchResults(token, maxRows); + } + + public void close() { + for (OperationHandle handle : submittedOperations.keySet()) { + closeOperation(handle); + } + LOG.info("Closes the Operation Manager."); Review Comment: This can be a DEBUG log, otherwise, every session close will print such a message. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ExecutionResult.java: ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.result; + +import org.apache.flink.table.gateway.common.results.ResultSet; + +import static org.apache.flink.table.gateway.service.utils.Constants.FETCH_ALL; + +/** Describe the execution results. */ +public class ExecutionResult { + + private final ResultFetcher resultFetcher; + + public ExecutionResult(ResultFetcher fetcher) { + this.resultFetcher = fetcher; + } + + public ResultSet fetchResults(long token, int maxRows) { + if (maxRows != FETCH_ALL && maxRows <= 0) { Review Comment: What's the usage of `ExecutionResult`? It seems the check for `maxRows` also makes sense in `ResultFetcher`. Can we just wrap `ResultFetcher` in `Operations`? Besides, can we just use `Int.MAX` as the FETCH_ALL? ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.utils; + +/** Constants. */ +public class Constants { + + public static final String OPERATION_POOL_NAME = "sql-gateway-operation-pool"; + + public static final int FETCH_ALL = -1; Review Comment: Would be better to move the constant to `ResultFetcher` and explain the special meaning of -1 or `FETCH_ALL` on `fetchResult()` method. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.result; + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.util.CloseableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; + +/** + * A fetcher to fetch result from submitted statement. + * + * <p>The fetcher uses the {@link Iterator} model. It means every time fetch the result with the + * current token, the fetcher will move forward and retire the old data. + */ +public class ResultFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(ResultFetcher.class); + + private final OperationHandle operationHandle; + + private final ResolvedSchema resultSchema; + private final ResultStore resultStore; + private final LinkedList<RowData> bufferedResults = new LinkedList<>(); + private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>(); + + private long currentToken = 0; + private int previousMaxFetchSize = 0; + private int previousResultSetSize = 0; + private boolean noMoreResults = false; + + public ResultFetcher( + OperationHandle operationHandle, + ResolvedSchema resultSchema, + CloseableIterator<RowData> resultRows, + int maxBufferSize) { + this.operationHandle = operationHandle; + this.resultSchema = resultSchema; + this.resultStore = new ResultStore(resultRows, maxBufferSize); + } + + public void close() { + resultStore.close(); + } + + public ResultSet fetchResult(long token, int maxFetchSize) { + if (token == currentToken) { + // equal to the Iterator.next() + if (noMoreResults) { + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more result for operation: {}.", operationHandle); + } + return new ResultSet( + ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList()); + } + + // a new token arrives, move the current buffer data into the prev buffered results. + bufferedPrevResults.clear(); + if (bufferedResults.isEmpty()) { + // buffered results have been totally consumed, + // so try to fetch new results + Optional<List<RowData>> newResults = resultStore.retrieveRecords(); + if (newResults.isPresent()) { + bufferedResults.addAll(newResults.get()); + } else { + noMoreResults = true; + return new ResultSet( + ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList()); + } + } + + previousMaxFetchSize = maxFetchSize; + if (maxFetchSize > 0) { + previousResultSetSize = Math.min(bufferedResults.size(), maxFetchSize); + } else { + previousResultSetSize = bufferedResults.size(); + } + + LOG.debug( + "Fetching current result for operation: {}, token: {}, maxFetchSize: {}, realReturnSize: {}.", + operationHandle, + token, + maxFetchSize, + previousResultSetSize); Review Comment: This variable is a little confusing to me because it actually reflects the size of the current result. How about changing it to a local variable and naming it `resultSize`? ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationManager.java: ########## @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.operation; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.operation.OperationType; +import org.apache.flink.table.gateway.common.results.OperationInfo; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.result.ExecutionResult; +import org.apache.flink.table.gateway.service.result.ResultFetcher; +import org.apache.flink.util.CloseableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; + +/** Manager for the {@link Operation}. */ +public class OperationManager { + + private static final Logger LOG = LoggerFactory.getLogger(OperationManager.class); + + private final Map<OperationHandle, Operation> submittedOperations; + private final ExecutorService service; + + public OperationManager(ExecutorService service) { + this.service = service; + submittedOperations = new ConcurrentHashMap<>(); + } + + public OperationHandle submitOperation( + OperationType operationType, Callable<ResultSet> executor) { + OperationHandle handle = OperationHandle.create(); + Operation operation = + new Operation( + handle, + operationType, + () -> { + ResultSet resultSet = executor.call(); + List<RowData> rows = resultSet.getData(); + return new ExecutionResult( + new ResultFetcher( + handle, + resultSet.getResultSchema(), + CloseableIterator.adapterForIterator(rows.iterator()), + rows.size())); + }); + submittedOperations.put(handle, operation); + operation.run(service); + return handle; + } + + public void cancelOperation(OperationHandle operationHandle) { + getOperation(operationHandle).cancel(); + } + + public void closeOperation(OperationHandle operationHandle) { Review Comment: Maybe we can add a bit more comments for the above method to explain the differences. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/utils/Constants.java: ########## @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.utils; + +/** Constants. */ +public class Constants { + + public static final String OPERATION_POOL_NAME = "sql-gateway-operation-pool"; Review Comment: This constant can sit in `SessionManager` because it's only used there. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.result; + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.util.CloseableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; + +/** + * A fetcher to fetch result from submitted statement. + * + * <p>The fetcher uses the {@link Iterator} model. It means every time fetch the result with the + * current token, the fetcher will move forward and retire the old data. + */ +public class ResultFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(ResultFetcher.class); + + private final OperationHandle operationHandle; + + private final ResolvedSchema resultSchema; + private final ResultStore resultStore; + private final LinkedList<RowData> bufferedResults = new LinkedList<>(); + private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>(); + + private long currentToken = 0; + private int previousMaxFetchSize = 0; + private int previousResultSetSize = 0; + private boolean noMoreResults = false; + + public ResultFetcher( + OperationHandle operationHandle, + ResolvedSchema resultSchema, + CloseableIterator<RowData> resultRows, + int maxBufferSize) { + this.operationHandle = operationHandle; + this.resultSchema = resultSchema; + this.resultStore = new ResultStore(resultRows, maxBufferSize); + } + + public void close() { + resultStore.close(); + } + + public ResultSet fetchResult(long token, int maxFetchSize) { + if (token == currentToken) { + // equal to the Iterator.next() + if (noMoreResults) { + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more result for operation: {}.", operationHandle); + } + return new ResultSet( + ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList()); + } + + // a new token arrives, move the current buffer data into the prev buffered results. + bufferedPrevResults.clear(); + if (bufferedResults.isEmpty()) { + // buffered results have been totally consumed, + // so try to fetch new results + Optional<List<RowData>> newResults = resultStore.retrieveRecords(); + if (newResults.isPresent()) { + bufferedResults.addAll(newResults.get()); + } else { + noMoreResults = true; + return new ResultSet( + ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList()); + } + } + + previousMaxFetchSize = maxFetchSize; + if (maxFetchSize > 0) { + previousResultSetSize = Math.min(bufferedResults.size(), maxFetchSize); + } else { + previousResultSetSize = bufferedResults.size(); + } + + LOG.debug( + "Fetching current result for operation: {}, token: {}, maxFetchSize: {}, realReturnSize: {}.", + operationHandle, + token, + maxFetchSize, + previousResultSetSize); + + // move forward + currentToken++; + moveCurrentResultToPrev(); + return new ResultSet( + ResultSet.ResultType.PAYLOAD, currentToken, resultSchema, bufferedPrevResults); + } else if (token == currentToken - 1 && token >= 0) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Fetching previous result for operation: {}, token: {}, maxFetchSize: {}", + operationHandle, + token, + maxFetchSize); + } + if (previousMaxFetchSize != maxFetchSize) { + String msg = + String.format( + "As the same token is provided, fetch size must be the same. Expecting max_fetch_size to be %s.", Review Comment: Print the current max fetch size as well. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/result/ResultFetcher.java: ########## @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.result; + +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.gateway.common.operation.OperationHandle; +import org.apache.flink.table.gateway.common.results.ResultSet; +import org.apache.flink.table.gateway.service.utils.SqlExecutionException; +import org.apache.flink.util.CloseableIterator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; + +/** + * A fetcher to fetch result from submitted statement. + * + * <p>The fetcher uses the {@link Iterator} model. It means every time fetch the result with the + * current token, the fetcher will move forward and retire the old data. + */ +public class ResultFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(ResultFetcher.class); + + private final OperationHandle operationHandle; + + private final ResolvedSchema resultSchema; + private final ResultStore resultStore; + private final LinkedList<RowData> bufferedResults = new LinkedList<>(); + private final LinkedList<RowData> bufferedPrevResults = new LinkedList<>(); + + private long currentToken = 0; + private int previousMaxFetchSize = 0; + private int previousResultSetSize = 0; + private boolean noMoreResults = false; + + public ResultFetcher( + OperationHandle operationHandle, + ResolvedSchema resultSchema, + CloseableIterator<RowData> resultRows, + int maxBufferSize) { + this.operationHandle = operationHandle; + this.resultSchema = resultSchema; + this.resultStore = new ResultStore(resultRows, maxBufferSize); + } + + public void close() { + resultStore.close(); + } + + public ResultSet fetchResult(long token, int maxFetchSize) { + if (token == currentToken) { + // equal to the Iterator.next() + if (noMoreResults) { + if (LOG.isDebugEnabled()) { + LOG.debug("There is no more result for operation: {}.", operationHandle); + } + return new ResultSet( + ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList()); + } + + // a new token arrives, move the current buffer data into the prev buffered results. + bufferedPrevResults.clear(); + if (bufferedResults.isEmpty()) { + // buffered results have been totally consumed, + // so try to fetch new results + Optional<List<RowData>> newResults = resultStore.retrieveRecords(); + if (newResults.isPresent()) { + bufferedResults.addAll(newResults.get()); + } else { + noMoreResults = true; + return new ResultSet( + ResultSet.ResultType.EOS, null, resultSchema, Collections.emptyList()); + } + } + + previousMaxFetchSize = maxFetchSize; + if (maxFetchSize > 0) { + previousResultSetSize = Math.min(bufferedResults.size(), maxFetchSize); + } else { + previousResultSetSize = bufferedResults.size(); + } + + LOG.debug( + "Fetching current result for operation: {}, token: {}, maxFetchSize: {}, realReturnSize: {}.", + operationHandle, + token, + maxFetchSize, + previousResultSetSize); + + // move forward + currentToken++; + moveCurrentResultToPrev(); Review Comment: We can expand the method here because it's only called here. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.session; + +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.operation.OperationManager; + +import java.io.Closeable; +import java.util.Map; + +/** Session that manages the registered resource including jars, registered table. */ Review Comment: Add more description to the session itself, e.g., what's a session, what is it used for. ``` Similar to HTTP Session, which could maintain user identity and store user-specific data during multiple request/response interactions between a client and the gateway server. ``` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.session; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.gateway.common.session.SessionEnvironment; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.DefaultContext; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.utils.Constants; +import org.apache.flink.table.gateway.service.utils.ThreadUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN; + +/** Manage the lifecycle of the {@code Session}. */ +public class SessionManager { + + private static final Logger LOG = LoggerFactory.getLogger(SessionManager.class); + + private final DefaultContext defaultContext; + + private final long idleTimeout; + private final long checkInterval; + private final int maxSessionCount; + + private final Map<SessionHandle, Session> sessions; + + private ExecutorService operationExecutorService; + private ScheduledExecutorService scheduledExecutorService; + private ScheduledFuture<?> timeoutCheckerFuture; + + public SessionManager(DefaultContext defaultContext) { + this.defaultContext = defaultContext; + ReadableConfig conf = defaultContext.getFlinkConfig(); + this.idleTimeout = conf.get(SQL_GATEWAY_SESSION_IDLE_TIMEOUT).toMillis(); + this.checkInterval = conf.get(SQL_GATEWAY_SESSION_CHECK_INTERVAL).toMillis(); + this.maxSessionCount = conf.get(SQL_GATEWAY_SESSION_MAX_NUM); + this.sessions = new ConcurrentHashMap<>(); + } + + public void start() { + if (checkInterval > 0 && idleTimeout > 0) { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + timeoutCheckerFuture = + scheduledExecutorService.scheduleAtFixedRate( + () -> { + LOG.debug( + "Start to cleanup expired sessions, current session count: {}", + sessions.size()); + for (Map.Entry<SessionHandle, Session> entry : + sessions.entrySet()) { + SessionHandle sessionId = entry.getKey(); + Session session = entry.getValue(); + if (isSessionExpired(session)) { + LOG.info("Session {} is expired, close it...", sessionId); + closeSession(session); + } + } + LOG.debug( + "Removing expired session finished, current session count: {}", + sessions.size()); + }, + checkInterval, + checkInterval, + TimeUnit.MILLISECONDS); + } + + ReadableConfig conf = defaultContext.getFlinkConfig(); + operationExecutorService = + ThreadUtils.newDaemonQueuedThreadPool( + conf.get(SQL_GATEWAY_WORKER_THREADS_MIN), + conf.get(SQL_GATEWAY_WORKER_THREADS_MAX), + conf.get(SQL_GATEWAY_WORKER_KEEPALIVE_TIME).toMillis(), + Constants.OPERATION_POOL_NAME); + } + + public void stop() { + if (scheduledExecutorService != null) { + timeoutCheckerFuture.cancel(true); + scheduledExecutorService.shutdown(); + } + if (operationExecutorService != null) { + operationExecutorService.shutdown(); + } + LOG.info("SessionManager is stopped."); + } + + public Session getSession(SessionHandle sessionHandle) throws SqlGatewayException { + Session session = sessions.get(sessionHandle); + if (session == null) { + String msg = String.format("Session '%s' does not exist.", sessionHandle); + LOG.warn(msg); + throw new SqlGatewayException(msg); + } + session.touch(); + return session; + } + + /** + * Register the session into the {@link SessionManager}. + * + * <p>Use synchronized to keep the checkSessionCount and build the Session are atomic. Review Comment: It seems you are using concurrent map instead of synchronized block. ########## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/utils/SqlGatewayServiceTestResource.java: ########## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.utils; + +import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.table.gateway.common.SqlGatewayService; +import org.apache.flink.table.gateway.service.SqlGatewayServiceImpl; +import org.apache.flink.table.gateway.service.context.DefaultContext; +import org.apache.flink.table.gateway.service.session.SessionManager; + +import org.junit.rules.ExternalResource; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR; + +/** Resource for {@link SqlGatewayService}. */ +public class SqlGatewayServiceTestResource extends ExternalResource { + + private SqlGatewayService service; + private SessionManager sessionManager; + + private TemporaryFolder temporaryFolder; + + @Override + protected void before() throws Throwable { + final Map<String, String> originalEnv = System.getenv(); + + try { + // prepare conf dir + temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + File confFolder = temporaryFolder.newFolder("conf"); + File confYaml = new File(confFolder, "flink-conf.yaml"); + if (!confYaml.createNewFile()) { + throw new IOException("Can't create testing flink-conf.yaml file."); + } + + // adjust the test environment for the purposes of this test + Map<String, String> map = new HashMap<>(System.getenv()); + map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath()); + CommonTestUtils.setEnv(map); + + sessionManager = new SessionManager(DefaultContext.load(new HashMap<>())); + } finally { + CommonTestUtils.setEnv(originalEnv); + } + + service = new SqlGatewayServiceImpl(sessionManager); + sessionManager.start(); + } + + @Override + protected void after() { + if (sessionManager != null) { + sessionManager.stop(); + } Review Comment: Delete `temporaryFolder` in `after()`. ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManager.java: ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.session; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.gateway.common.session.SessionEnvironment; +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.common.utils.SqlGatewayException; +import org.apache.flink.table.gateway.service.context.DefaultContext; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.utils.Constants; +import org.apache.flink.table.gateway.service.utils.ThreadUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX; +import static org.apache.flink.table.gateway.common.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN; + +/** Manage the lifecycle of the {@code Session}. */ +public class SessionManager { + + private static final Logger LOG = LoggerFactory.getLogger(SessionManager.class); + + private final DefaultContext defaultContext; + + private final long idleTimeout; + private final long checkInterval; + private final int maxSessionCount; + + private final Map<SessionHandle, Session> sessions; + + private ExecutorService operationExecutorService; + private ScheduledExecutorService scheduledExecutorService; + private ScheduledFuture<?> timeoutCheckerFuture; + + public SessionManager(DefaultContext defaultContext) { + this.defaultContext = defaultContext; + ReadableConfig conf = defaultContext.getFlinkConfig(); + this.idleTimeout = conf.get(SQL_GATEWAY_SESSION_IDLE_TIMEOUT).toMillis(); + this.checkInterval = conf.get(SQL_GATEWAY_SESSION_CHECK_INTERVAL).toMillis(); + this.maxSessionCount = conf.get(SQL_GATEWAY_SESSION_MAX_NUM); + this.sessions = new ConcurrentHashMap<>(); + } + + public void start() { + if (checkInterval > 0 && idleTimeout > 0) { + scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + timeoutCheckerFuture = + scheduledExecutorService.scheduleAtFixedRate( + () -> { + LOG.debug( + "Start to cleanup expired sessions, current session count: {}", + sessions.size()); + for (Map.Entry<SessionHandle, Session> entry : + sessions.entrySet()) { + SessionHandle sessionId = entry.getKey(); + Session session = entry.getValue(); + if (isSessionExpired(session)) { + LOG.info("Session {} is expired, close it...", sessionId); + closeSession(session); + } + } + LOG.debug( + "Removing expired session finished, current session count: {}", + sessions.size()); + }, + checkInterval, + checkInterval, + TimeUnit.MILLISECONDS); + } + + ReadableConfig conf = defaultContext.getFlinkConfig(); + operationExecutorService = + ThreadUtils.newDaemonQueuedThreadPool( + conf.get(SQL_GATEWAY_WORKER_THREADS_MIN), + conf.get(SQL_GATEWAY_WORKER_THREADS_MAX), + conf.get(SQL_GATEWAY_WORKER_KEEPALIVE_TIME).toMillis(), + Constants.OPERATION_POOL_NAME); + } + + public void stop() { + if (scheduledExecutorService != null) { + timeoutCheckerFuture.cancel(true); + scheduledExecutorService.shutdown(); + } + if (operationExecutorService != null) { + operationExecutorService.shutdown(); + } + LOG.info("SessionManager is stopped."); + } + + public Session getSession(SessionHandle sessionHandle) throws SqlGatewayException { + Session session = sessions.get(sessionHandle); + if (session == null) { + String msg = String.format("Session '%s' does not exist.", sessionHandle); + LOG.warn(msg); + throw new SqlGatewayException(msg); + } + session.touch(); + return session; + } + + /** + * Register the session into the {@link SessionManager}. + * + * <p>Use synchronized to keep the checkSessionCount and build the Session are atomic. + */ + public synchronized Session openSession(SessionEnvironment environment) + throws SqlGatewayException { + // check session limit + checkSessionCount(); + + Session session = null; + SessionHandle sessionId = null; + do { + sessionId = SessionHandle.create(); + } while (sessions.containsKey(sessionId)); + + SessionContext sessionContext = + SessionContext.create( + defaultContext, + sessionId, + environment.getSessionEndpointVersion(), + Configuration.fromMap(environment.getSessionConfig()), + operationExecutorService); + session = new Session(sessionContext); + sessions.put(sessionId, session); + + LOG.info( + "Session {} is opened, current sessions: {}.", Review Comment: ```suggestion "Session {} is opened, and the number of current sessions is {}.", ``` ########## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/Session.java: ########## @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.gateway.service.session; + +import org.apache.flink.table.gateway.common.session.SessionHandle; +import org.apache.flink.table.gateway.service.context.SessionContext; +import org.apache.flink.table.gateway.service.operation.OperationManager; + +import java.io.Closeable; +import java.util.Map; + +/** Session that manages the registered resource including jars, registered table. */ +public class Session implements Closeable { + + private final SessionContext sessionContext; + private long lastAccessTime; Review Comment: Please add tests for concurrent requests both on single-session and multiple-sessions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
