lincoln-lil commented on code in PR #21800:
URL: https://github.com/apache/flink/pull/21800#discussion_r1094098027
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java:
##########
@@ -98,91 +98,85 @@ public class ExecutorImpl implements Executor {
private static final Logger LOG =
LoggerFactory.getLogger(ExecutorImpl.class);
private static final long HEARTBEAT_INTERVAL_MILLISECONDS = 60_000L;
- private final DefaultContext defaultContext;
private final InetSocketAddress gatewayAddress;
- private final long heartbeatInterval;
- private final ExecutorService service;
+ private final ExecutorService executorService;
private final ScheduledExecutorService heartbeatScheduler;
private final RestClient restClient;
+ private final SqlGatewayRestAPIVersion connectionVersion;
private SessionHandle sessionHandle;
- private SqlGatewayRestAPIVersion connectionVersion;
+
+ public static ExecutorImpl create(
+ DefaultContext defaultContext, InetSocketAddress gatewayAddress,
String sessionId) {
+ return create(defaultContext, gatewayAddress, sessionId,
HEARTBEAT_INTERVAL_MILLISECONDS);
+ }
@VisibleForTesting
- public ExecutorImpl(
+ public static ExecutorImpl create(
DefaultContext defaultContext,
InetSocketAddress gatewayAddress,
+ String sessionId,
long heartbeatInterval) {
- this.defaultContext = defaultContext;
- this.gatewayAddress = gatewayAddress;
- this.heartbeatInterval = heartbeatInterval;
- this.service = Executors.newCachedThreadPool();
- this.heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
- try {
- this.restClient = new RestClient(defaultContext.getFlinkConfig(),
service);
- } catch (Exception e) {
- throw new SqlClientException("Can not create the Rest Client.", e);
- }
- }
-
- public ExecutorImpl(DefaultContext defaultContext, InetSocketAddress
gatewayAddress) {
- this(defaultContext, gatewayAddress, HEARTBEAT_INTERVAL_MILLISECONDS);
- }
-
- public void openSession(String sessionId) {
+ ExecutorService executor = Executors.newCachedThreadPool();
try {
+ RestClient restClient = new
RestClient(defaultContext.getFlinkConfig(), executor);
// determine gateway rest api version
- connectionVersion = negotiateVersion();
+ SqlGatewayRestAPIVersion connectionVersion =
+ negotiateVersion(restClient, gatewayAddress);
// open session to address:port
LOG.info("Open session to {}.", gatewayAddress);
OpenSessionResponseBody response =
- sendRequest(
+ restClient
+ .sendRequest(
+ gatewayAddress.getHostName(),
+ gatewayAddress.getPort(),
OpenSessionHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
new OpenSessionRequestBody(
- sessionId,
defaultContext.getFlinkConfig().toMap()))
+ sessionId,
defaultContext.getFlinkConfig().toMap()),
+ Collections.emptyList(),
+ connectionVersion)
.get();
- sessionHandle = new
SessionHandle(UUID.fromString(response.getSessionHandle()));
- // register heartbeat service
- heartbeatScheduler.scheduleAtFixedRate(
- () ->
- getResponse(
- sendRequest(
-
TriggerSessionHeartbeatHeaders.getInstance(),
- new
SessionMessageParameters(sessionHandle),
- EmptyRequestBody.getInstance())),
- heartbeatInterval,
- heartbeatInterval,
- TimeUnit.MILLISECONDS);
+ SessionHandle sessionHandle =
+ new
SessionHandle(UUID.fromString(response.getSessionHandle()));
+ return new ExecutorImpl(
+ restClient,
+ executor,
+ gatewayAddress,
+ connectionVersion,
+ sessionHandle,
+ heartbeatInterval);
} catch (Exception e) {
- throw new SqlClientException(
- String.format("Failed to open session to %s",
gatewayAddress), e);
+ executor.shutdownNow();
Review Comment:
it may also raise exception
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/SqlClient.java:
##########
@@ -266,16 +265,21 @@ public EmbeddedGateway(DefaultContext defaultContext) {
defaultConfig.addAll(
restConfig,
getSqlGatewayOptionPrefix(SqlGatewayRestEndpointFactory.IDENTIFIER));
-
- sqlGateway = new SqlGateway(defaultConfig, new
SingleSessionManager(defaultContext));
- }
-
- void start() {
+ SqlGateway sqlGateway =
+ new SqlGateway(defaultConfig, new
SingleSessionManager(defaultContext));
try {
sqlGateway.start();
Review Comment:
It's better to add some log here, e.g.,
```
if (LOG.isDebugEnabled()) {
LOG.debug("start embedded gateway on port:{}",
port.getPort());
}
```
##########
flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java:
##########
@@ -98,91 +98,85 @@ public class ExecutorImpl implements Executor {
private static final Logger LOG =
LoggerFactory.getLogger(ExecutorImpl.class);
private static final long HEARTBEAT_INTERVAL_MILLISECONDS = 60_000L;
- private final DefaultContext defaultContext;
private final InetSocketAddress gatewayAddress;
- private final long heartbeatInterval;
- private final ExecutorService service;
+ private final ExecutorService executorService;
private final ScheduledExecutorService heartbeatScheduler;
private final RestClient restClient;
+ private final SqlGatewayRestAPIVersion connectionVersion;
private SessionHandle sessionHandle;
- private SqlGatewayRestAPIVersion connectionVersion;
+
+ public static ExecutorImpl create(
+ DefaultContext defaultContext, InetSocketAddress gatewayAddress,
String sessionId) {
+ return create(defaultContext, gatewayAddress, sessionId,
HEARTBEAT_INTERVAL_MILLISECONDS);
+ }
@VisibleForTesting
- public ExecutorImpl(
+ public static ExecutorImpl create(
DefaultContext defaultContext,
InetSocketAddress gatewayAddress,
+ String sessionId,
long heartbeatInterval) {
- this.defaultContext = defaultContext;
- this.gatewayAddress = gatewayAddress;
- this.heartbeatInterval = heartbeatInterval;
- this.service = Executors.newCachedThreadPool();
- this.heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
- try {
- this.restClient = new RestClient(defaultContext.getFlinkConfig(),
service);
- } catch (Exception e) {
- throw new SqlClientException("Can not create the Rest Client.", e);
- }
- }
-
- public ExecutorImpl(DefaultContext defaultContext, InetSocketAddress
gatewayAddress) {
- this(defaultContext, gatewayAddress, HEARTBEAT_INTERVAL_MILLISECONDS);
- }
-
- public void openSession(String sessionId) {
+ ExecutorService executor = Executors.newCachedThreadPool();
try {
+ RestClient restClient = new
RestClient(defaultContext.getFlinkConfig(), executor);
// determine gateway rest api version
- connectionVersion = negotiateVersion();
+ SqlGatewayRestAPIVersion connectionVersion =
+ negotiateVersion(restClient, gatewayAddress);
// open session to address:port
LOG.info("Open session to {}.", gatewayAddress);
OpenSessionResponseBody response =
- sendRequest(
+ restClient
+ .sendRequest(
+ gatewayAddress.getHostName(),
+ gatewayAddress.getPort(),
OpenSessionHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
new OpenSessionRequestBody(
- sessionId,
defaultContext.getFlinkConfig().toMap()))
+ sessionId,
defaultContext.getFlinkConfig().toMap()),
+ Collections.emptyList(),
+ connectionVersion)
.get();
- sessionHandle = new
SessionHandle(UUID.fromString(response.getSessionHandle()));
- // register heartbeat service
- heartbeatScheduler.scheduleAtFixedRate(
- () ->
- getResponse(
- sendRequest(
-
TriggerSessionHeartbeatHeaders.getInstance(),
- new
SessionMessageParameters(sessionHandle),
- EmptyRequestBody.getInstance())),
- heartbeatInterval,
- heartbeatInterval,
- TimeUnit.MILLISECONDS);
+ SessionHandle sessionHandle =
+ new
SessionHandle(UUID.fromString(response.getSessionHandle()));
+ return new ExecutorImpl(
+ restClient,
+ executor,
+ gatewayAddress,
+ connectionVersion,
+ sessionHandle,
+ heartbeatInterval);
} catch (Exception e) {
- throw new SqlClientException(
- String.format("Failed to open session to %s",
gatewayAddress), e);
+ executor.shutdownNow();
+ throw new SqlClientException("Failed to create the executor.", e);
}
}
- public void closeSession() throws SqlExecutionException {
- if (sessionHandle == null) {
- return;
- }
- try {
- CompletableFuture<CloseSessionResponseBody> response =
- sendRequest(
- CloseSessionHeaders.getInstance(),
- new SessionMessageParameters(sessionHandle),
- EmptyRequestBody.getInstance());
+ private ExecutorImpl(
+ RestClient restClient,
+ ExecutorService executor,
+ InetSocketAddress gatewayAddress,
+ SqlGatewayRestAPIVersion connectionVersion,
+ SessionHandle sessionHandle,
+ long heartbeatInterval) {
+ this.restClient = restClient;
+ this.executorService = executor;
+ this.gatewayAddress = gatewayAddress;
+ this.sessionHandle = sessionHandle;
+ this.connectionVersion = connectionVersion;
- if (!response.get().getStatus().equals(CLOSE_MESSAGE)) {
- LOG.warn("The status of close session response isn't {}.",
CLOSE_MESSAGE);
- }
- } catch (Exception e) {
- LOG.warn(
- String.format(
- "Unexpected error occurs when closing session
%s.", sessionHandle),
- e);
- // ignore any throwable to keep the cleanup running
- } finally {
- sessionHandle = null;
- }
+ this.heartbeatScheduler = Executors.newSingleThreadScheduledExecutor();
Review Comment:
Can we move this scheduler into create method and use a unified resource
mangager, such as 'AutoCloseableRegistry' to register and close all resources
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]