Repository: cassandra Updated Branches: refs/heads/trunk e296ff063 -> ebefc96a8
Correctly close netty channels when a stream session ends patch by jasobrown; reviewed by Ariel Weisberg for CASSANDRA-13905 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ebefc96a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ebefc96a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ebefc96a Branch: refs/heads/trunk Commit: ebefc96a8fe63aca5f324984f7f3147f10218643 Parents: e296ff0 Author: Jason Brown <[email protected]> Authored: Mon Sep 25 15:39:17 2017 -0700 Committer: Jason Brown <[email protected]> Committed: Fri Sep 29 09:33:08 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../async/NettyStreamingMessageSender.java | 24 ++++++++++++-------- .../org/apache/cassandra/utils/FBUtilities.java | 17 +++++++++++++- 3 files changed, 31 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefc96a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ea73fcd..99b5a59 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Correctly close netty channels when a stream session ends (CASSANDRA-13905) * Update lz4 to 1.4.0 (CASSANDRA-13741) * Optimize Paxos prepare and propose stage for local requests (CASSANDRA-13862) * Throttle base partitions during MV repair streaming to prevent OOM (CASSANDRA-13299) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefc96a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java index f872005..0b38760 100644 --- a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java +++ b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java @@ -21,7 +21,9 @@ package org.apache.cassandra.streaming.async; import java.io.IOError; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue; @@ -106,10 +108,9 @@ public class NettyStreamingMessageSender implements StreamingMessageSender private final ThreadPoolExecutor fileTransferExecutor; /** - * A {@link ThreadLocal} used by the threads in {@link #fileTransferExecutor} to stash references to constructed - * and connected {@link Channel}s. + * A mapping of each {@link #fileTransferExecutor} thread to a channel that can be written to (on that thread). */ - private final ConcurrentMap<Thread, Channel> threadLocalChannel = new ConcurrentHashMap<>(); + private final ConcurrentMap<Thread, Channel> threadToChannelMap = new ConcurrentHashMap<>(); /** * A netty channel attribute used to indicate if a channel is currently transferring a file. This is primarily used @@ -373,12 +374,12 @@ public class NettyStreamingMessageSender implements StreamingMessageSender Thread currentThread = Thread.currentThread(); try { - Channel channel = threadLocalChannel.get(currentThread); + Channel channel = threadToChannelMap.get(currentThread); if (channel != null) return channel; channel = createChannel(); - threadLocalChannel.put(currentThread, channel); + threadToChannelMap.put(currentThread, channel); return channel; } catch (Exception e) @@ -393,10 +394,10 @@ public class NettyStreamingMessageSender implements StreamingMessageSender void injectChannel(Channel channel) { Thread currentThread = Thread.currentThread(); - if (threadLocalChannel.get(currentThread) != null) + if (threadToChannelMap.get(currentThread) != null) throw new IllegalStateException("previous channel already set"); - threadLocalChannel.put(currentThread, channel); + threadToChannelMap.put(currentThread, channel); } /** @@ -404,7 +405,7 @@ public class NettyStreamingMessageSender implements StreamingMessageSender */ void unsetChannel() { - threadLocalChannel.remove(Thread.currentThread()); + threadToChannelMap.remove(Thread.currentThread()); } } @@ -498,8 +499,11 @@ public class NettyStreamingMessageSender implements StreamingMessageSender channelKeepAlives.stream().map(scheduledFuture -> scheduledFuture.cancel(false)); channelKeepAlives.clear(); - threadLocalChannel.values().stream().map(channel -> channel.close()); - threadLocalChannel.clear(); + List<Future<Void>> futures = new ArrayList<>(threadToChannelMap.size()); + for (Channel channel : threadToChannelMap.values()) + futures.add(channel.close()); + FBUtilities.waitOnFutures(futures, 10 * 1000); + threadToChannelMap.clear(); fileTransferExecutor.shutdownNow(); if (controlMessageChannel != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ebefc96a/src/java/org/apache/cassandra/utils/FBUtilities.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java b/src/java/org/apache/cassandra/utils/FBUtilities.java index 319512d..f45a1ab 100644 --- a/src/java/org/apache/cassandra/utils/FBUtilities.java +++ b/src/java/org/apache/cassandra/utils/FBUtilities.java @@ -383,13 +383,28 @@ public class FBUtilities public static <T> List<T> waitOnFutures(Iterable<? extends Future<? extends T>> futures) { + return waitOnFutures(futures, -1); + } + + /** + * Block for a collection of futures, with an optional timeout for each future. + * + * @param futures + * @param ms The number of milliseconds to wait on each future. If this value is less than or equal to zero, + * no tiemout value will be passed to {@link Future#get()}. + */ + public static <T> List<T> waitOnFutures(Iterable<? extends Future<? extends T>> futures, long ms) + { List<T> results = new ArrayList<>(); Throwable fail = null; for (Future<? extends T> f : futures) { try { - results.add(f.get()); + if (ms <= 0) + results.add(f.get()); + else + results.add(f.get(ms, TimeUnit.MILLISECONDS)); } catch (Throwable t) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
