[
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16278253#comment-16278253
]
ASF GitHub Bot commented on FLINK-7880:
---------------------------------------
Github user kl0u commented on a diff in the pull request:
https://github.com/apache/flink/pull/5062#discussion_r154885293
--- Diff:
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
---
@@ -251,34 +263,81 @@ private boolean attemptToBind(final int port) throws
Throwable {
throw future.cause();
} catch (BindException e) {
log.debug("Failed to start {} on port {}: {}.",
serverName, port, e.getMessage());
- shutdown();
+ try {
+ shutdownServer(Time.seconds(10L)).get();
+ } catch (Exception r) {
+
+ // Here we were seeing this problem:
+ // https://github.com/netty/netty/issues/4357
if we do a get().
+ // this is why we now simply wait a bit so that
everything is
+ // shut down and then we check
+
+ log.warn("Problem while shutting down {}: {}",
serverName, r.getMessage());
+ }
}
// any other type of exception we let it bubble up.
return false;
}
/**
* Shuts down the server and all related thread pools.
+ * @param timeout The time to wait for the shutdown process to complete.
+ * @return A {@link CompletableFuture} that will be completed upon
termination of the shutdown process.
*/
- public void shutdown() {
- log.info("Shutting down {} @ {}", serverName, serverAddress);
-
- if (handler != null) {
- handler.shutdown();
- handler = null;
- }
-
- if (queryExecutor != null) {
- queryExecutor.shutdown();
- }
+ public CompletableFuture<Void> shutdownServer(Time timeout) throws
InterruptedException {
+ CompletableFuture<Void> shutdownFuture = new
CompletableFuture<>();
+ if (serverShutdownFuture.compareAndSet(null, shutdownFuture)) {
+ log.info("Shutting down {} @ {}", serverName,
serverAddress);
+
+ final CompletableFuture<Void> groupShutdownFuture = new
CompletableFuture<>();
+ if (bootstrap != null) {
+ EventLoopGroup group = bootstrap.group();
+ if (group != null && !group.isShutdown()) {
+ group.shutdownGracefully(0L, 0L,
TimeUnit.MILLISECONDS)
+ .addListener(finished
-> {
+ if
(finished.isSuccess()) {
+
groupShutdownFuture.complete(null);
+ } else {
+
groupShutdownFuture.completeExceptionally(finished.cause());
+ }
+ });
+ } else {
+ groupShutdownFuture.complete(null);
+ }
+ } else {
+ groupShutdownFuture.complete(null);
+ }
- if (bootstrap != null) {
- EventLoopGroup group = bootstrap.group();
- if (group != null) {
- group.shutdownGracefully(0L, 10L,
TimeUnit.SECONDS);
+ final CompletableFuture<Void> handlerShutdownFuture =
new CompletableFuture<>();
+ if (handler == null) {
+ handlerShutdownFuture.complete(null);
+ } else {
+ handler.shutdown().whenComplete((result,
throwable) -> {
+ if (throwable != null) {
+
handlerShutdownFuture.completeExceptionally(throwable);
+ } else {
+
handlerShutdownFuture.complete(null);
+ }
+ });
}
+
+ final CompletableFuture<Void> queryExecShutdownFuture =
CompletableFuture.runAsync(() -> {
+ if (queryExecutor != null) {
+
ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS,
queryExecutor);
+ }
+ });
+
+ CompletableFuture.allOf(
+ queryExecShutdownFuture,
groupShutdownFuture, handlerShutdownFuture
+ ).whenComplete((result, throwable) -> {
+ if (throwable != null) {
+
shutdownFuture.completeExceptionally(throwable);
+ } else {
+ shutdownFuture.complete(null);
+ }
+ });
}
- serverAddress = null;
+ return serverShutdownFuture.get();
--- End diff --
I do think so, because this would mean that we can restart the server that
we intentionally shut down in that past, right? (This is only done in the
start() when we fail to bind to a specific port).
> flink-queryable-state-java fails with core-dump
> -----------------------------------------------
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
> Issue Type: Bug
> Components: Queryable State, Tests
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Kostas Kloudas
> Priority: Critical
> Labels: test-stability
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)