LadyForest commented on code in PR #21771:
URL: https://github.com/apache/flink/pull/21771#discussion_r1090161462


##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManagerImpl.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.api.utils.ThreadUtils;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN;
+
+/**
+ * The implementation of the {@link SessionManager} that manage the lifecycle 
of the {@code
+ * Session}.
+ */
+public class SessionManagerImpl implements SessionManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SessionManagerImpl.class);
+    private static final String OPERATION_POOL_NAME = 
"sql-gateway-operation-pool";
+
+    protected final DefaultContext defaultContext;
+
+    private final long idleTimeout;
+    private final long checkInterval;
+    private final int maxSessionCount;
+
+    private final Map<SessionHandle, Session> sessions;
+
+    private ExecutorService operationExecutorService;
+    private ScheduledExecutorService scheduledExecutorService;

Review Comment:
   Nit: renaming to a more meaning full name, like `cleanupExecutorService`?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManagerImpl.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.api.utils.ThreadUtils;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN;
+
+/**
+ * The implementation of the {@link SessionManager} that manage the lifecycle 
of the {@code
+ * Session}.
+ */
+public class SessionManagerImpl implements SessionManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SessionManagerImpl.class);
+    private static final String OPERATION_POOL_NAME = 
"sql-gateway-operation-pool";
+
+    protected final DefaultContext defaultContext;

Review Comment:
   `defaultContext` can be `final`?



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/SingleSessionManager.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.resource.ClientResourceManager;
+import org.apache.flink.table.client.util.ClientClassloaderUtil;
+import org.apache.flink.table.client.util.ClientWrapperClassLoader;
+import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.operation.OperationExecutor;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.session.Session;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.util.MutableURLClassLoader;
+import org.apache.flink.util.Preconditions;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link SessionManager} only has one session at most.

Review Comment:
   Nit: add more description about what the goal the {@link 
SingleSessionManager} will achieve?



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/session/SessionManagerImplTest.java:
##########
@@ -54,7 +54,7 @@ public void setup() {
                 
SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL,
                 Duration.ofMillis(100));
         conf.set(SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM, 
3);
-        sessionManager = new SessionManager(new DefaultContext(conf, 
Collections.emptyList()));
+        sessionManager = new SessionManagerImpl(new DefaultContext(conf, 
Collections.emptyList()));

Review Comment:
   Nit: use `SessionManager#create`?



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/DefaultContextUtils.java:
##########
@@ -118,6 +98,8 @@ private static List<URL> discoverDependencies(List<URL> 
jars, List<URL> librarie
             LOG.debug("Using the following dependencies: {}", dependencies);
         }
 
+        // add python dependencies
+        dependencies.addAll(DefaultContext.discoverPythonDependencies());

Review Comment:
   I feel this is a little weird for `DefaultContextUtils` to create a 
`DefaultContext` instance but need `DefaultContext` providing args. Can this 
logic be encapsulated within `DefaultContext` itself?



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java:
##########
@@ -154,10 +154,6 @@ public void openSession(@Nullable String sessionId) {
                     heartbeatInterval,
                     heartbeatInterval,
                     TimeUnit.MILLISECONDS);
-            // register dependencies

Review Comment:
   Comment for L#132, I think sessionId should not be null?



##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/SingleSessionManager.java:
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.gateway.local;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.client.SqlClientException;
+import org.apache.flink.table.client.resource.ClientResourceManager;
+import org.apache.flink.table.client.util.ClientClassloaderUtil;
+import org.apache.flink.table.client.util.ClientWrapperClassLoader;
+import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+import org.apache.flink.table.gateway.service.operation.OperationExecutor;
+import org.apache.flink.table.gateway.service.operation.OperationManager;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.session.Session;
+import org.apache.flink.table.gateway.service.session.SessionManager;
+import org.apache.flink.util.MutableURLClassLoader;
+import org.apache.flink.util.Preconditions;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * A {@link SessionManager} only has one session at most.
+ *
+ * <p>The special {@link SessionManager} is used in the Sql Client embedded 
mode and doesn't support
+ * concurrently modification.
+ */
+public class SingleSessionManager implements SessionManager {
+
+    private final DefaultContext defaultContext;
+    private final ExecutorService operationExecutorService;
+
+    private Session session;
+
+    public SingleSessionManager(DefaultContext defaultContext) {
+        this.defaultContext = defaultContext;
+        this.operationExecutorService = Executors.newSingleThreadExecutor();
+    }
+
+    @Override
+    public void start() {}
+
+    @Override
+    public void stop() {
+        operationExecutorService.shutdown();
+    }
+
+    @Override
+    public Session getSession(SessionHandle sessionHandle) throws 
SqlGatewayException {
+        Preconditions.checkArgument(
+                session != null && 
sessionHandle.equals(session.getSessionHandle()),
+                "The specified session doesn't exists");
+        return session;
+    }
+
+    @Override
+    public synchronized Session openSession(SessionEnvironment environment)

Review Comment:
   No need to add `synchronized` here?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java:
##########
@@ -152,4 +155,23 @@ public static DefaultContext load(Configuration 
dynamicConfig, List<URL> depende
 
         return new DefaultContext(configuration, dependencies);
     }
+
+    public static List<URL> discoverPythonDependencies() {

Review Comment:
   Perhaps we don't need to expose this method.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/session/SessionManagerImpl.java:
##########
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.session;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.SqlGatewayException;
+import org.apache.flink.table.gateway.api.utils.ThreadUtils;
+import org.apache.flink.table.gateway.service.context.DefaultContext;
+import org.apache.flink.table.gateway.service.context.SessionContext;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_CHECK_INTERVAL;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_IDLE_TIMEOUT;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_SESSION_MAX_NUM;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_KEEPALIVE_TIME;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MAX;
+import static 
org.apache.flink.table.gateway.api.config.SqlGatewayServiceConfigOptions.SQL_GATEWAY_WORKER_THREADS_MIN;
+
+/**
+ * The implementation of the {@link SessionManager} that manage the lifecycle 
of the {@code
+ * Session}.
+ */
+public class SessionManagerImpl implements SessionManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SessionManagerImpl.class);
+    private static final String OPERATION_POOL_NAME = 
"sql-gateway-operation-pool";
+
+    protected final DefaultContext defaultContext;
+
+    private final long idleTimeout;
+    private final long checkInterval;
+    private final int maxSessionCount;
+
+    private final Map<SessionHandle, Session> sessions;
+
+    private ExecutorService operationExecutorService;
+    private ScheduledExecutorService scheduledExecutorService;
+    private ScheduledFuture<?> timeoutCheckerFuture;

Review Comment:
   Add `@Nullable` annotation for `timeoutCheckerFuture` and 
`scheduledExecutorService`



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/DefaultContext.java:
##########
@@ -152,4 +155,23 @@ public static DefaultContext load(Configuration 
dynamicConfig, List<URL> depende
 
         return new DefaultContext(configuration, dependencies);
     }
+

Review Comment:
   What about 
   
   ```java
   public static DefaultContext load(Configuration dynamicConfig, List<URL> 
dependencies) {
           return load(dynamicConfig, dependencies, false);
       }
   
       public static DefaultContext load(
               Configuration dynamicConfig, List<URL> dependencies, boolean 
discoverPythonDependency) {
           // 1. find the configuration directory
           String flinkConfigDir = 
CliFrontend.getConfigurationDirectoryFromEnv();
   
           // 2. load the global configuration
           Configuration configuration = 
GlobalConfiguration.loadConfiguration(flinkConfigDir);
           configuration.addAll(dynamicConfig);
   
           // 3. load the custom command lines
           List<CustomCommandLine> commandLines =
                   CliFrontend.loadCustomCommandLines(configuration, 
flinkConfigDir);
   
           // initialize default file system
           FileSystem.initialize(
                   configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
   
           if (discoverPythonDependency) {
               dependencies = new ArrayList<>(dependencies);
               dependencies.addAll(discoverPythonDependencies());
           }
   
           Options commandLineOptions = collectCommandLineOptions(commandLines);
   
           try {
               CommandLine deploymentCommandLine =
                       CliFrontendParser.parse(commandLineOptions, new String[] 
{}, true);
               configuration.addAll(
                       createExecutionConfig(
                               deploymentCommandLine, commandLineOptions, 
commandLines, dependencies));
           } catch (Exception e) {
               throw new SqlGatewayException(
                       "Could not load available CLI with Environment 
Deployment entry.", e);
           }
   
           return new DefaultContext(configuration, dependencies);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to