[
https://issues.apache.org/jira/browse/FLINK-7880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16260869#comment-16260869
]
ASF GitHub Bot commented on FLINK-7880:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5038#discussion_r152300724
--- Diff:
flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
---
@@ -166,28 +167,57 @@ public String getClientName() {
* Shuts down the client and closes all connections.
*
* <p>After a call to this method, all returned futures will be failed.
+ *
+ * @return A {@link CompletableFuture} that will be completed when the
shutdown process is done.
*/
- public void shutdown() {
- if (shutDown.compareAndSet(false, true)) {
+ public CompletableFuture<?> shutdown() {
+ final CompletableFuture<?> newShutdownFuture = new
CompletableFuture<>();
+ if (clientShutdownFuture.compareAndSet(null,
newShutdownFuture)) {
+
+ final List<CompletableFuture<?>> connectionFutures =
new ArrayList<>();
+
for (Map.Entry<InetSocketAddress,
EstablishedConnection> conn : establishedConnections.entrySet()) {
if
(establishedConnections.remove(conn.getKey(), conn.getValue())) {
- conn.getValue().close();
+
connectionFutures.add(conn.getValue().close());
}
}
for (Map.Entry<InetSocketAddress, PendingConnection>
conn : pendingConnections.entrySet()) {
if (pendingConnections.remove(conn.getKey()) !=
null) {
- conn.getValue().close();
+
connectionFutures.add(conn.getValue().close());
}
}
- if (bootstrap != null) {
- EventLoopGroup group = bootstrap.group();
- if (group != null) {
- group.shutdownGracefully(0L, 10L,
TimeUnit.SECONDS);
+ CompletableFuture.allOf(
+ connectionFutures.toArray(new
CompletableFuture<?>[connectionFutures.size()])
+ ).whenComplete((result, throwable) -> {
+ if (throwable != null) {
+
newShutdownFuture.completeExceptionally(throwable);
+ } else if (bootstrap != null) {
+ EventLoopGroup group =
bootstrap.group();
+ if (group != null &&
!group.isShutdown()) {
+ group.shutdownGracefully(0L,
0L, TimeUnit.MILLISECONDS)
+
.addListener(finished -> {
+ if
(finished.isSuccess()) {
+
newShutdownFuture.complete(null);
+ } else {
+
newShutdownFuture.completeExceptionally(finished.cause());
+ }
+ });
+ } else {
+
newShutdownFuture.complete(null);
+ }
+ } else {
+ newShutdownFuture.complete(null);
}
+ });
+
+ // check again if in the meantime another thread
completed the future
+ if (clientShutdownFuture.compareAndSet(null,
newShutdownFuture)) {
--- End diff --
where in close() do we set the shutdown future to null? I only see that
being done in sendRequest. (which seems fishy)
> flink-queryable-state-java fails with core-dump
> -----------------------------------------------
>
> Key: FLINK-7880
> URL: https://issues.apache.org/jira/browse/FLINK-7880
> Project: Flink
> Issue Type: Bug
> Components: Queryable State, Tests
> Affects Versions: 1.4.0
> Reporter: Till Rohrmann
> Assignee: Kostas Kloudas
> Priority: Critical
> Labels: test-stability
>
> The {{flink-queryable-state-java}} module fails on Travis with a core dump.
> https://travis-ci.org/tillrohrmann/flink/jobs/289949829
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)