move streaming to use netty patch by jasobrown, reviewed by aweisberg for CASSANDRA-12229
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fc92db2b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fc92db2b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fc92db2b Branch: refs/heads/trunk Commit: fc92db2b9b56c143516026ba29cecdec37e286bb Parents: 356dc3c Author: Jason Brown <jasedbr...@gmail.com> Authored: Mon Apr 11 05:26:18 2016 -0700 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Tue Aug 22 13:54:44 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + lib/compress-lzf-0.8.4.jar | Bin 25490 -> 0 bytes .../org/apache/cassandra/config/Config.java | 6 - .../cassandra/config/DatabaseDescriptor.java | 15 - .../exceptions/ChecksumMismatchException.java | 34 ++ .../io/compress/CompressionMetadata.java | 4 +- .../cassandra/io/sstable/SSTableLoader.java | 2 +- .../io/util/DataIntegrityMetadata.java | 26 + .../net/IncomingStreamingConnection.java | 104 ---- .../net/async/ByteBufDataInputPlus.java | 12 + .../net/async/ByteBufDataOutputStreamPlus.java | 191 +++++++ .../net/async/InboundHandshakeHandler.java | 36 +- .../cassandra/net/async/NettyFactory.java | 24 +- .../net/async/OutboundConnectionIdentifier.java | 21 +- .../net/async/OutboundHandshakeHandler.java | 9 +- .../async/RebufferingByteBufDataInputPlus.java | 250 +++++++++ .../apache/cassandra/security/SSLFactory.java | 49 -- .../cassandra/service/StorageService.java | 11 - .../cassandra/service/StorageServiceMBean.java | 3 - .../cassandra/streaming/ConnectionHandler.java | 428 ---------------- .../streaming/DefaultConnectionFactory.java | 122 +++-- .../streaming/StreamConnectionFactory.java | 11 +- .../cassandra/streaming/StreamCoordinator.java | 22 +- .../cassandra/streaming/StreamManager.java | 24 +- .../apache/cassandra/streaming/StreamPlan.java | 2 +- .../cassandra/streaming/StreamReader.java | 25 +- .../streaming/StreamReceiveException.java | 36 ++ .../cassandra/streaming/StreamReceiveTask.java | 1 + .../cassandra/streaming/StreamResultFuture.java | 32 +- .../cassandra/streaming/StreamSession.java | 396 ++++++++------- .../cassandra/streaming/StreamTransferTask.java | 10 +- .../cassandra/streaming/StreamWriter.java | 115 +++-- .../streaming/StreamingMessageSender.java | 34 ++ .../async/NettyStreamingMessageSender.java | 508 +++++++++++++++++++ .../async/StreamCompressionSerializer.java | 133 +++++ .../async/StreamingInboundHandler.java | 268 ++++++++++ .../cassandra/streaming/async/package-info.java | 71 +++ .../ByteBufCompressionDataOutputStreamPlus.java | 76 +++ .../compress/CompressedInputStream.java | 225 ++++---- .../compress/CompressedStreamReader.java | 17 +- .../compress/CompressedStreamWriter.java | 25 +- .../compress/StreamCompressionInputStream.java | 78 +++ .../streaming/messages/CompleteMessage.java | 10 +- .../streaming/messages/FileMessageHeader.java | 38 +- .../streaming/messages/IncomingFileMessage.java | 30 +- .../streaming/messages/KeepAliveMessage.java | 9 +- .../streaming/messages/OutgoingFileMessage.java | 28 +- .../streaming/messages/PrepareAckMessage.java | 57 +++ .../streaming/messages/PrepareMessage.java | 93 ---- .../messages/PrepareSynAckMessage.java | 80 +++ .../streaming/messages/PrepareSynMessage.java | 98 ++++ .../streaming/messages/ReceivedMessage.java | 11 +- .../streaming/messages/RetryMessage.java | 71 --- .../messages/SessionFailedMessage.java | 10 +- .../streaming/messages/StreamInitMessage.java | 73 +-- .../streaming/messages/StreamMessage.java | 58 +-- .../tools/BulkLoadConnectionFactory.java | 32 +- .../org/apache/cassandra/tools/NodeProbe.java | 7 - .../cassandra/tools/nodetool/GetTimeout.java | 2 +- .../org/apache/cassandra/utils/UUIDGen.java | 8 +- .../cassandra/streaming/LongStreamingTest.java | 34 +- .../cassandra/cql3/PreparedStatementsTest.java | 2 +- .../util/RewindableDataInputStreamPlusTest.java | 2 +- .../net/async/HandshakeHandlersTest.java | 4 +- .../net/async/InboundHandshakeHandlerTest.java | 8 +- .../async/OutboundMessagingConnectionTest.java | 45 -- .../RebufferingByteBufDataInputPlusTest.java | 126 +++++ .../net/async/TestScheduledFuture.java | 66 +++ .../apache/cassandra/service/RemoveTest.java | 3 +- .../streaming/StreamTransferTaskTest.java | 7 +- .../streaming/StreamingTransferTest.java | 28 - .../async/NettyStreamingMessageSenderTest.java | 202 ++++++++ .../async/StreamCompressionSerializerTest.java | 135 +++++ .../async/StreamingInboundHandlerTest.java | 168 ++++++ .../compression/CompressedInputStreamTest.java | 10 +- 75 files changed, 3508 insertions(+), 1504 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f2e643e..a14e390 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * use netty for streaming (CASSANDRA-12229) * Use netty for internode messaging (CASSANDRA-8457) * Add bytes repaired/unrepaired to nodetool tablestats (CASSANDRA-13774) * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/lib/compress-lzf-0.8.4.jar ---------------------------------------------------------------------- diff --git a/lib/compress-lzf-0.8.4.jar b/lib/compress-lzf-0.8.4.jar deleted file mode 100644 index a712c24..0000000 Binary files a/lib/compress-lzf-0.8.4.jar and /dev/null differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 77d5bf4..537cf39 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -97,12 +97,6 @@ public class Config public volatile long truncate_request_timeout_in_ms = 60000L; - /** - * @deprecated use {@link #streaming_keep_alive_period_in_secs} instead - */ - @Deprecated - public int streaming_socket_timeout_in_ms = 86400000; //24 hours - public Integer streaming_connections_per_host = 1; public Integer streaming_keep_alive_period_in_secs = 300; //5 minutes http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 53bac93..302a528 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -2060,21 +2060,6 @@ public class DatabaseDescriptor conf.counter_cache_keys_to_save = counterCacheKeysToSave; } - public static void setStreamingSocketTimeout(int value) - { - conf.streaming_socket_timeout_in_ms = value; - } - - /** - * @deprecated use {@link #getStreamingKeepAlivePeriod()} instead - * @return streaming_socket_timeout_in_ms property - */ - @Deprecated - public static int getStreamingSocketTimeout() - { - return conf.streaming_socket_timeout_in_ms; - } - public static int getStreamingKeepAlivePeriod() { return conf.streaming_keep_alive_period_in_secs; http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/exceptions/ChecksumMismatchException.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/exceptions/ChecksumMismatchException.java b/src/java/org/apache/cassandra/exceptions/ChecksumMismatchException.java new file mode 100644 index 0000000..a76c46c --- /dev/null +++ b/src/java/org/apache/cassandra/exceptions/ChecksumMismatchException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.exceptions; + +import java.io.IOException; + +public class ChecksumMismatchException extends IOException +{ + public ChecksumMismatchException() + { + super(); + } + + public ChecksumMismatchException(String s) + { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index 6c1849f..8ac6589 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -145,7 +145,9 @@ public class CompressionMetadata this.chunkOffsetsSize = chunkOffsets.size(); } - private CompressionMetadata(String filePath, CompressionParams parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength) + // do not call this constructor directly, unless used in testing + @VisibleForTesting + public CompressionMetadata(String filePath, CompressionParams parameters, Memory offsets, long offsetsSize, long dataLength, long compressedLength) { this.indexFilePath = filePath; this.parameters = parameters; http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java index ff47bec..dc56520 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java @@ -208,7 +208,7 @@ public class SSTableLoader implements StreamEventHandler for (SSTableReader sstable : sstables) { sstable.selfRef().release(); - assert sstable.selfRef().globalCount() == 0; + assert sstable.selfRef().globalCount() == 0 : String.format("for sstable = %s, ref count = %d", sstable, sstable.selfRef().globalCount()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java index 91b189d..277b359 100644 --- a/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java +++ b/src/java/org/apache/cassandra/io/util/DataIntegrityMetadata.java @@ -20,9 +20,12 @@ package org.apache.cassandra.io.util; import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.zip.CheckedInputStream; import java.util.zip.Checksum; +import com.google.common.annotations.VisibleForTesting; + import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.utils.ChecksumType; @@ -57,6 +60,15 @@ public class DataIntegrityMetadata chunkSize = reader.readInt(); } + @VisibleForTesting + protected ChecksumValidator(ChecksumType checksumType, RandomAccessReader reader, int chunkSize) + { + this.checksumType = checksumType; + this.reader = reader; + this.dataFilename = null; + this.chunkSize = chunkSize; + } + public void seek(long offset) { long start = chunkStart(offset); @@ -77,6 +89,20 @@ public class DataIntegrityMetadata throw new IOException("Corrupted File : " + dataFilename); } + /** + * validates the checksum with the bytes from the specified buffer. + * + * Upon return, the buffer's position will + * be updated to its limit; its limit will not have been changed. + */ + public void validate(ByteBuffer buffer) throws IOException + { + int current = (int) checksumType.of(buffer); + int actual = reader.readInt(); + if (current != actual) + throw new IOException("Corrupted File : " + dataFilename); + } + public void close() { reader.close(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java b/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java deleted file mode 100644 index 8db5fcb..0000000 --- a/src/java/org/apache/cassandra/net/IncomingStreamingConnection.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.net; - -import java.io.Closeable; -import java.io.IOException; -import java.net.Socket; -import java.util.Set; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataInputPlus.DataInputStreamPlus; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.streaming.StreamResultFuture; -import org.apache.cassandra.streaming.messages.StreamInitMessage; -import org.apache.cassandra.streaming.messages.StreamMessage; - -/** - * Thread to consume stream init messages. - */ -public class IncomingStreamingConnection extends Thread implements Closeable -{ - private static final Logger logger = LoggerFactory.getLogger(IncomingStreamingConnection.class); - - private final int version; - public final Socket socket; - private final Set<Closeable> group; - - public IncomingStreamingConnection(int version, Socket socket, Set<Closeable> group) - { - super("STREAM-INIT-" + socket.getRemoteSocketAddress()); - this.version = version; - this.socket = socket; - this.group = group; - } - - @Override - @SuppressWarnings("resource") // Not closing constructed DataInputPlus's as the stream needs to remain open. - public void run() - { - try - { - // streaming connections are per-session and have a fixed version. - // we can't do anything with a wrong-version stream connection, so drop it. - if (version != StreamMessage.CURRENT_VERSION) - throw new IOException(String.format("Received stream using protocol version %d (my version %d). Terminating connection", version, StreamMessage.CURRENT_VERSION)); - - DataInputPlus input = new DataInputStreamPlus(socket.getInputStream()); - StreamInitMessage init = StreamInitMessage.serializer.deserialize(input, version); - - //Set SO_TIMEOUT on follower side - if (!init.isForOutgoing) - socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); - - // The initiator makes two connections, one for incoming and one for outgoing. - // The receiving side distinguish two connections by looking at StreamInitMessage#isForOutgoing. - // Note: we cannot use the same socket for incoming and outgoing streams because we want to - // parallelize said streams and the socket is blocking, so we might deadlock. - StreamResultFuture.initReceivingSide(init.sessionIndex, init.planId, init.streamOperation, init.from, this, init.isForOutgoing, version, init.keepSSTableLevel, init.pendingRepair, init.previewKind); - } - catch (Throwable t) - { - logger.error("Error while reading from socket from {}.", socket.getRemoteSocketAddress(), t); - close(); - } - } - - @Override - public void close() - { - try - { - if (!socket.isClosed()) - { - socket.close(); - } - } - catch (IOException e) - { - logger.debug("Error closing socket", e); - } - finally - { - group.remove(this); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java index f9fa07a..23e532c 100644 --- a/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java +++ b/src/java/org/apache/cassandra/net/async/ByteBufDataInputPlus.java @@ -24,8 +24,20 @@ import org.apache.cassandra.io.util.DataInputPlus; public class ByteBufDataInputPlus extends ByteBufInputStream implements DataInputPlus { + /** + * The parent class does not expose the buffer to derived classes, so we need + * to stash a reference here so it can be exposed via {@link #buffer()}. + */ + private final ByteBuf buf; + public ByteBufDataInputPlus(ByteBuf buffer) { super(buffer); + this.buf = buffer; + } + + public ByteBuf buffer() + { + return buf; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java new file mode 100644 index 0000000..3a544e4 --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/ByteBufDataOutputStreamPlus.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.Uninterruptibles; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.streaming.StreamSession; + +/** + * A {@link DataOutputStreamPlus} that writes to a {@link ByteBuf}. The novelty here is that all writes + * actually get written in to a {@link ByteBuffer} that shares a backing buffer with a {@link ByteBuf}. + * The trick to do that is allocate the ByteBuf, get a ByteBuffer from it by calling {@link ByteBuf#nioBuffer()}, + * and passing that to the super class as {@link #buffer}. When the {@link #buffer} is full or {@link #doFlush(int)} + * is invoked, the {@link #currentBuf} is published to the netty channel. + */ +public class ByteBufDataOutputStreamPlus extends BufferedDataOutputStreamPlus +{ + private final StreamSession session; + private final Channel channel; + private final int bufferSize; + + /** + * Tracks how many bytes we've written to the netty channel. This more or less follows the channel's + * high/low water marks and ultimately the 'writablility' status of the channel. Unfortunately there's + * no notification mechanism that can poke a producer to let it know when the channel becomes writable + * (after it was unwritable); hence, the use of a {@link Semaphore}. + */ + private final Semaphore channelRateLimiter; + + /** + * This *must* be the owning {@link ByteBuf} for the {@link BufferedDataOutputStreamPlus#buffer} + */ + private ByteBuf currentBuf; + + private ByteBufDataOutputStreamPlus(StreamSession session, Channel channel, ByteBuf buffer, int bufferSize) + { + super(buffer.nioBuffer(0, bufferSize)); + this.session = session; + this.channel = channel; + this.currentBuf = buffer; + this.bufferSize = bufferSize; + + channelRateLimiter = new Semaphore(channel.config().getWriteBufferHighWaterMark(), true); + } + + @Override + protected WritableByteChannel newDefaultChannel() + { + return new WritableByteChannel() + { + @Override + public int write(ByteBuffer src) throws IOException + { + assert src == buffer; + int size = src.position(); + doFlush(size); + return size; + } + + @Override + public boolean isOpen() + { + return channel.isOpen(); + } + + @Override + public void close() + { } + }; + } + + public static ByteBufDataOutputStreamPlus create(StreamSession session, Channel channel, int bufferSize) + { + ByteBuf buf = channel.alloc().directBuffer(bufferSize, bufferSize); + return new ByteBufDataOutputStreamPlus(session, channel, buf, bufferSize); + } + + /** + * Writes the incoming buffer directly to the backing {@link #channel}, without copying to the intermediate {@link #buffer}. + */ + public ChannelFuture writeToChannel(ByteBuf buf) throws IOException + { + doFlush(buffer.position()); + + int byteCount = buf.readableBytes(); + if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 5, TimeUnit.MINUTES)) + throw new IOException("outbound channel was not writable"); + + // the (possibly naive) assumption that we should always flush after each incoming buf + ChannelFuture channelFuture = channel.writeAndFlush(buf); + channelFuture.addListener(future -> handleBuffer(future, byteCount)); + return channelFuture; + } + + /** + * Writes the incoming buffer directly to the backing {@link #channel}, without copying to the intermediate {@link #buffer}. + * The incoming buffer will be automatically released when the netty channel invokes the listeners of success/failure to + * send the buffer. + */ + public ChannelFuture writeToChannel(ByteBuffer buffer) throws IOException + { + ChannelFuture channelFuture = writeToChannel(Unpooled.wrappedBuffer(buffer)); + channelFuture.addListener(future -> FileUtils.clean(buffer)); + return channelFuture; + } + + @Override + protected void doFlush(int count) throws IOException + { + // flush the current backing write buffer only if there's any pending data + if (buffer.position() > 0 && channel.isOpen()) + { + int byteCount = buffer.position(); + currentBuf.writerIndex(byteCount); + + if (!Uninterruptibles.tryAcquireUninterruptibly(channelRateLimiter, byteCount, 2, TimeUnit.MINUTES)) + throw new IOException("outbound channel was not writable"); + + channel.writeAndFlush(currentBuf).addListener(future -> handleBuffer(future, byteCount)); + currentBuf = channel.alloc().directBuffer(bufferSize, bufferSize); + buffer = currentBuf.nioBuffer(0, bufferSize); + } + } + + /** + * Handles the result of publishing a buffer to the channel. + * + * Note: this will be executed on the event loop. + */ + private void handleBuffer(Future<? super Void> future, int bytesWritten) + { + channelRateLimiter.release(bytesWritten); + + if (!future.isSuccess() && channel.isOpen()) + session.onError(future.cause()); + } + + public ByteBufAllocator getAllocator() + { + return channel.alloc(); + } + + /** + * {@inheritDoc} + * + * Flush any last buffered (if the channel is open), and release any buffers. *Not* responsible for closing + * the netty channel as we might use it again for transferring more files. + * + * Note: should be called on the producer thread, not the netty event loop. + */ + @Override + public void close() throws IOException + { + doFlush(0); + if (currentBuf.refCnt() > 0) + currentBuf.release(); + currentBuf = null; + buffer = null; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java b/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java index 5ea03dc..7a8303c 100644 --- a/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java +++ b/src/java/org/apache/cassandra/net/async/InboundHandshakeHandler.java @@ -14,6 +14,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; +import io.netty.channel.AdaptiveRecvByteBufAllocator; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; @@ -25,6 +26,8 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage; import org.apache.cassandra.net.async.HandshakeProtocol.SecondHandshakeMessage; import org.apache.cassandra.net.async.HandshakeProtocol.ThirdHandshakeMessage; +import org.apache.cassandra.streaming.async.StreamingInboundHandler; +import org.apache.cassandra.streaming.messages.StreamMessage; /** * 'Server'-side component that negotiates the internode handshake when establishing a new connection. @@ -36,13 +39,13 @@ class InboundHandshakeHandler extends ByteToMessageDecoder { private static final Logger logger = LoggerFactory.getLogger(NettyFactory.class); - enum State { START, AWAITING_HANDSHAKE_BEGIN, AWAIT_STREAM_START_RESPONSE, AWAIT_MESSAGING_START_RESPONSE, MESSAGING_HANDSHAKE_COMPLETE, HANDSHAKE_FAIL } + enum State { START, AWAITING_HANDSHAKE_BEGIN, AWAIT_MESSAGING_START_RESPONSE, HANDSHAKE_COMPLETE, HANDSHAKE_FAIL } private State state; private final IInternodeAuthenticator authenticator; - private boolean hasAuthenticated; + private boolean hasAuthenticated; /** * The peer's declared messaging version. */ @@ -160,9 +163,16 @@ class InboundHandshakeHandler extends ByteToMessageDecoder if (msg.mode == NettyFactory.Mode.STREAMING) { - // TODO fill in once streaming is moved to netty - ctx.close(); - return State.AWAIT_STREAM_START_RESPONSE; + // streaming connections are per-session and have a fixed version. we can't do anything with a wrong-version stream connection, so drop it. + if (version != StreamMessage.CURRENT_VERSION) + { + logger.warn("Received stream using protocol version %d (my version %d). Terminating connection", version, MessagingService.current_version); + ctx.close(); + return State.HANDSHAKE_FAIL; + } + + setupStreamingPipeline(ctx, version); + return State.HANDSHAKE_COMPLETE; } else { @@ -195,6 +205,18 @@ class InboundHandshakeHandler extends ByteToMessageDecoder } } + private void setupStreamingPipeline(ChannelHandlerContext ctx, int protocolVersion) + { + ChannelPipeline pipeline = ctx.pipeline(); + InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress(); + pipeline.addLast(NettyFactory.instance.streamingGroup, "streamInbound", new StreamingInboundHandler(address, protocolVersion, null)); + pipeline.remove(this); + + // pass a custom recv ByteBuf allocator to the channel. the default recv ByteBuf size is 1k, but in streaming we're + // dealing with large bulk blocks of data, let's default to larger sizes + ctx.channel().config().setRecvByteBufAllocator(new AdaptiveRecvByteBufAllocator(1 << 8, 1 << 13, 1 << 16)); + } + /** * Handles the third (and last) message in the internode messaging handshake protocol. Grabs the protocol version and * IP addr the peer wants to use. @@ -227,7 +249,7 @@ class InboundHandshakeHandler extends ByteToMessageDecoder logger.trace("Set version for {} to {} (will use {})", from, maxVersion, MessagingService.instance().getVersion(from)); setupMessagingPipeline(ctx.pipeline(), from, compressed, version); - return State.MESSAGING_HANDSHAKE_COMPLETE; + return State.HANDSHAKE_COMPLETE; } @VisibleForTesting @@ -245,7 +267,7 @@ class InboundHandshakeHandler extends ByteToMessageDecoder { // we're not really racing on the handshakeTimeout as we're in the event loop, // but, hey, defensive programming is beautiful thing! - if (state == State.MESSAGING_HANDSHAKE_COMPLETE || (handshakeTimeout != null && handshakeTimeout.isCancelled())) + if (state == State.HANDSHAKE_COMPLETE || (handshakeTimeout != null && handshakeTimeout.isCancelled())) return; state = State.HANDSHAKE_FAIL; http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/NettyFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/NettyFactory.java b/src/java/org/apache/cassandra/net/async/NettyFactory.java index 13d8810..762c39b 100644 --- a/src/java/org/apache/cassandra/net/async/NettyFactory.java +++ b/src/java/org/apache/cassandra/net/async/NettyFactory.java @@ -40,6 +40,7 @@ import io.netty.util.concurrent.DefaultThreadFactory; import io.netty.util.concurrent.EventExecutor; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.Slf4JLoggerFactory; + import net.jpountz.lz4.LZ4Factory; import net.jpountz.xxhash.XXHashFactory; import org.apache.cassandra.auth.IInternodeAuthenticator; @@ -69,12 +70,18 @@ public final class NettyFactory private static final int LZ4_HASH_SEED = 0x9747b28c; + /** + * Default seed value for xxhash. + */ + public static final int XXHASH_DEFAULT_SEED = 0x9747b28c; + public enum Mode { MESSAGING, STREAMING } private static final String SSL_CHANNEL_HANDLER_NAME = "ssl"; - static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor"; - static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = "outboundCompressor"; - private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; + public static final String INBOUND_COMPRESSOR_HANDLER_NAME = "inboundCompressor"; + public static final String OUTBOUND_COMPRESSOR_HANDLER_NAME = "outboundCompressor"; + public static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; + public static final String INBOUND_STREAM_HANDLER_NAME = "inboundStreamHandler"; /** a useful addition for debugging; simply set to true to get more data in your logs */ private static final boolean WIRETRACE = false; @@ -113,6 +120,7 @@ public final class NettyFactory private final EventLoopGroup inboundGroup; private final EventLoopGroup outboundGroup; + public final EventLoopGroup streamingGroup; /** * Constructor that allows modifying the {@link NettyFactory#useEpoll} for testing purposes. Otherwise, use the @@ -126,6 +134,7 @@ public final class NettyFactory "MessagingService-NettyAcceptor-Threads", false); inboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyInbound-Threads", false); outboundGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "MessagingService-NettyOutbound-Threads", true); + streamingGroup = getEventLoopGroup(useEpoll, FBUtilities.getAvailableProcessors(), "Streaming-Netty-Threads", false); } /** @@ -257,7 +266,8 @@ public final class NettyFactory SslContext sslContext = SSLFactory.getSslContext(encryptionOptions, true, true); SslHandler sslHandler = sslContext.newHandler(channel.alloc()); logger.trace("creating inbound netty SslContext: context={}, engine={}", sslContext.getClass().getName(), sslHandler.engine().getClass().getName()); - pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler); } + pipeline.addFirst(SSL_CHANNEL_HANDLER_NAME, sslHandler); + } if (WIRETRACE) pipeline.addLast("logger", new LoggingHandler(LogLevel.INFO)); @@ -279,13 +289,14 @@ public final class NettyFactory * Create the {@link Bootstrap} for connecting to a remote peer. This method does <b>not</b> attempt to connect to the peer, * and thus does not block. */ + @VisibleForTesting public Bootstrap createOutboundBootstrap(OutboundConnectionParams params) { logger.debug("creating outbound bootstrap to peer {}, compression: {}, encryption: {}, coalesce: {}", params.connectionId.connectionAddress(), params.compress, encryptionLogStatement(params.encryptionOptions), params.coalescingStrategy.isPresent() ? params.coalescingStrategy.get() : CoalescingStrategies.Strategy.DISABLED); - Class<? extends Channel> transport = useEpoll ? EpollSocketChannel.class : NioSocketChannel.class; - Bootstrap bootstrap = new Bootstrap().group(outboundGroup) + Class<? extends Channel> transport = useEpoll ? EpollSocketChannel.class : NioSocketChannel.class; + Bootstrap bootstrap = new Bootstrap().group(params.mode == Mode.MESSAGING ? outboundGroup : streamingGroup) .channel(transport) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000) .option(ChannelOption.SO_KEEPALIVE, true) @@ -349,6 +360,7 @@ public final class NettyFactory acceptGroup.shutdownGracefully(); outboundGroup.shutdownGracefully(); inboundGroup.shutdownGracefully(); + streamingGroup.shutdownGracefully(); } static Lz4FrameEncoder createLz4Encoder(int protocolVersion) http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java index 24dc5ff..c834bd4 100644 --- a/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java +++ b/src/java/org/apache/cassandra/net/async/OutboundConnectionIdentifier.java @@ -32,7 +32,7 @@ public class OutboundConnectionIdentifier { enum ConnectionType { - GOSSIP, LARGE_MESSAGE, SMALL_MESSAGE + GOSSIP, LARGE_MESSAGE, SMALL_MESSAGE, STREAM } /** @@ -99,6 +99,15 @@ public class OutboundConnectionIdentifier } /** + * Creates an identifier for a gossip connection and using the remote "identifying" address as its connection + * address. + */ + public static OutboundConnectionIdentifier stream(InetSocketAddress localAddr, InetSocketAddress remoteAddr) + { + return new OutboundConnectionIdentifier(localAddr, remoteAddr, ConnectionType.STREAM); + } + + /** * Returns a newly created connection identifier to the same remote that this identifier, but using the provided * address as connection address. * @@ -106,7 +115,7 @@ public class OutboundConnectionIdentifier * @return a newly created connection identifier that differs from this one only by using {@code remoteConnectionAddr} * as connection address to the remote. */ - OutboundConnectionIdentifier withNewConnectionAddress(InetSocketAddress remoteConnectionAddr) + public OutboundConnectionIdentifier withNewConnectionAddress(InetSocketAddress remoteConnectionAddr) { return new OutboundConnectionIdentifier(localAddr, remoteAddr, remoteConnectionAddr, connectionType); } @@ -114,7 +123,7 @@ public class OutboundConnectionIdentifier /** * The local node address. */ - InetAddress local() + public InetAddress local() { return localAddr.getAddress(); } @@ -122,7 +131,7 @@ public class OutboundConnectionIdentifier /** * The remote node identifying address (the one to use for anything else than connecting to the node). */ - InetSocketAddress remoteAddress() + public InetSocketAddress remoteAddress() { return remoteAddr; } @@ -130,7 +139,7 @@ public class OutboundConnectionIdentifier /** * The remote node identifying address (the one to use for anything else than connecting to the node). */ - InetAddress remote() + public InetAddress remote() { return remoteAddr.getAddress(); } @@ -138,7 +147,7 @@ public class OutboundConnectionIdentifier /** * The remote node connection address (the one to use to actually connect to the remote, and only that). */ - InetSocketAddress connectionAddress() + public InetSocketAddress connectionAddress() { return remoteConnectionAddr; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java index 703549a..c555bed 100644 --- a/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java +++ b/src/java/org/apache/cassandra/net/async/OutboundHandshakeHandler.java @@ -36,6 +36,7 @@ import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.async.HandshakeProtocol.FirstHandshakeMessage; @@ -95,7 +96,9 @@ public class OutboundHandshakeHandler extends ByteToMessageDecoder /** * {@inheritDoc} * - * Invoked when the channel is made active, and sends out the {@link FirstHandshakeMessage} + * Invoked when the channel is made active, and sends out the {@link FirstHandshakeMessage}. + * In the case of streaming, we do not require a full bi-directional handshake; the initial message, + * containing the streaming protocol version, is all that is required. */ @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception @@ -103,6 +106,10 @@ public class OutboundHandshakeHandler extends ByteToMessageDecoder FirstHandshakeMessage msg = new FirstHandshakeMessage(messagingVersion, mode, params.compress); logger.trace("starting handshake with peer {}, msg = {}", connectionId.connectionAddress(), msg); ctx.writeAndFlush(msg.encode(ctx.alloc())).addListener(future -> firstHandshakeMessageListener(future, ctx)); + + if (mode == NettyFactory.Mode.STREAMING) + ctx.pipeline().remove(this); + ctx.fireChannelActive(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java new file mode 100644 index 0000000..580bc03 --- /dev/null +++ b/src/java/org/apache/cassandra/net/async/RebufferingByteBufDataInputPlus.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.net.async; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelConfig; +import io.netty.util.ReferenceCountUtil; +import org.apache.cassandra.io.util.RebufferingInputStream; + +public class RebufferingByteBufDataInputPlus extends RebufferingInputStream implements ReadableByteChannel +{ + /** + * The parent, or owning, buffer of the current buffer being read from ({@link super#buffer}). + */ + private ByteBuf currentBuf; + + private final BlockingQueue<ByteBuf> queue; + + /** + * The count of live bytes in all {@link ByteBuf}s held by this instance. + */ + private final AtomicInteger queuedByteCount; + + private final int lowWaterMark; + private final int highWaterMark; + private final ChannelConfig channelConfig; + + private volatile boolean closed; + + public RebufferingByteBufDataInputPlus(int lowWaterMark, int highWaterMark, ChannelConfig channelConfig) + { + super(Unpooled.EMPTY_BUFFER.nioBuffer()); + + if (lowWaterMark > highWaterMark) + throw new IllegalArgumentException(String.format("low water mark is greater than high water mark: %d vs %d", lowWaterMark, highWaterMark)); + + currentBuf = Unpooled.EMPTY_BUFFER; + this.lowWaterMark = lowWaterMark; + this.highWaterMark = highWaterMark; + this.channelConfig = channelConfig; + queue = new LinkedBlockingQueue<>(); + queuedByteCount = new AtomicInteger(); + } + + /** + * Append a {@link ByteBuf} to the end of the einternal queue. + * + * Note: it's expected this method is invoked on the netty event loop. + */ + public void append(ByteBuf buf) throws IllegalStateException + { + assert buf != null : "buffer cannot be null"; + + if (closed) + { + ReferenceCountUtil.release(buf); + throw new IllegalStateException("stream is already closed, so cannot add another buffer"); + } + + // this slightly undercounts the live count as it doesn't include the currentBuf's size. + // that's ok as the worst we'll do is allow another buffer in and add it to the queue, + // and that point we'll disable auto-read. this is a tradeoff versus making some other member field + // atomic or volatile. + int queuedCount = queuedByteCount.addAndGet(buf.readableBytes()); + if (channelConfig.isAutoRead() && queuedCount > highWaterMark) + channelConfig.setAutoRead(false); + + queue.add(buf); + } + + /** + * {@inheritDoc} + * + * Release open buffers and poll the {@link #queue} for more data. + * <p> + * This is best, and more or less expected, to be invoked on a consuming thread (not the event loop) + * becasue if we block on the queue we can't fill it on the event loop (as that's where the buffers are coming from). + */ + @Override + protected void reBuffer() throws IOException + { + currentBuf.release(); + buffer = null; + currentBuf = null; + + // possibly re-enable auto-read, *before* blocking on the queue, because if we block on the queue + // without enabling auto-read we'll block forever :( + if (!channelConfig.isAutoRead() && queuedByteCount.get() < lowWaterMark) + channelConfig.setAutoRead(true); + + try + { + currentBuf = queue.take(); + int bytes; + // if we get an explicitly empty buffer, we treat that as an indicator that the input is closed + if (currentBuf == null || (bytes = currentBuf.readableBytes()) == 0) + { + releaseResources(); + throw new EOFException(); + } + + buffer = currentBuf.nioBuffer(currentBuf.readerIndex(), bytes); + assert buffer.remaining() == bytes; + queuedByteCount.addAndGet(-bytes); + return; + } + catch (InterruptedException ie) + { + // nop - ignore + } + } + + @Override + public int read(ByteBuffer dst) throws IOException + { + int readLength = dst.remaining(); + int remaining = readLength; + + while (remaining > 0) + { + if (closed) + throw new EOFException(); + + if (!buffer.hasRemaining()) + reBuffer(); + int copyLength = Math.min(remaining, buffer.remaining()); + + int originalLimit = buffer.limit(); + buffer.limit(buffer.position() + copyLength); + dst.put(buffer); + buffer.limit(originalLimit); + remaining -= copyLength; + } + + return readLength; + } + + /** + * {@inheritDoc} + * + * As long as this method is invoked on the consuming thread the returned value will be accurate. + */ + @Override + public int available() throws EOFException + { + if (closed) + throw new EOFException(); + + final int availableBytes = queuedByteCount.get() + (buffer != null ? buffer.remaining() : 0); + + if (!channelConfig.isAutoRead() && availableBytes < lowWaterMark) + channelConfig.setAutoRead(true); + + return availableBytes; + } + + @Override + public boolean isOpen() + { + return !closed; + } + + /** + * {@inheritDoc} + * + * Note: This should invoked on the consuming thread. + */ + @Override + public void close() + { + closed = true; + releaseResources(); + } + + private void releaseResources() + { + if (currentBuf != null) + { + if (currentBuf.refCnt() > 0) + currentBuf.release(currentBuf.refCnt()); + currentBuf = null; + buffer = null; + } + + ByteBuf buf; + while ((buf = queue.poll()) != null && buf.refCnt() > 0) + buf.release(buf.refCnt()); + } + + /** + * Mark this stream as closed, but do not release any of the resources. + * + * Note: this is best to be called from the producer thread. + */ + public void markClose() + { + if (!closed) + { + closed = true; + queue.add(Unpooled.EMPTY_BUFFER); + } + } + + /** + * {@inheritDoc} + * + * Note: this is best to be called from the consumer thread. + */ + @Override + public String toString() + { + return new StringBuilder(128).append("RebufferingByteBufDataInputPlus: currentBuf = ").append(currentBuf) + .append(" (super.buffer = ").append(buffer).append(')') + .append(", queuedByteCount = ").append(queuedByteCount) + .append(", queue buffers = ").append(queue) + .append(", closed = ").append(closed) + .toString(); + } + + public ByteBufAllocator getAllocator() + { + return channelConfig.getAllocator(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/security/SSLFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/security/SSLFactory.java b/src/java/org/apache/cassandra/security/SSLFactory.java index 3c1293f..a931f5f 100644 --- a/src/java/org/apache/cassandra/security/SSLFactory.java +++ b/src/java/org/apache/cassandra/security/SSLFactory.java @@ -20,7 +20,6 @@ package org.apache.cassandra.security; import java.io.IOException; import java.io.InputStream; -import java.net.InetAddress; import java.nio.file.Files; import java.nio.file.Paths; import java.security.KeyStore; @@ -32,7 +31,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLParameters; import javax.net.ssl.SSLSocket; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; @@ -79,53 +77,6 @@ public final class SSLFactory */ private static final AtomicReference<SslContext> serverSslContext = new AtomicReference<>(); - /** Create a socket and connect */ - public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port, InetAddress localAddress, int localPort) throws IOException - { - SSLContext ctx = createSSLContext(options, true); - SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port, localAddress, localPort); - try - { - prepareSocket(socket, options); - return socket; - } - catch (IllegalArgumentException e) - { - socket.close(); - throw e; - } - } - - /** Create a socket and connect, using any local address */ - public static SSLSocket getSocket(EncryptionOptions options, InetAddress address, int port) throws IOException - { - SSLContext ctx = createSSLContext(options, true); - SSLSocket socket = (SSLSocket) ctx.getSocketFactory().createSocket(address, port); - try - { - prepareSocket(socket, options); - return socket; - } - catch (IllegalArgumentException e) - { - socket.close(); - throw e; - } - } - - /** Sets relevant socket options specified in encryption settings */ - private static void prepareSocket(SSLSocket socket, EncryptionOptions options) - { - String[] suites = filterCipherSuites(socket.getSupportedCipherSuites(), options.cipher_suites); - if(options.require_endpoint_verification) - { - SSLParameters sslParameters = socket.getSSLParameters(); - sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); - socket.setSSLParameters(sslParameters); - } - socket.setEnabledCipherSuites(suites); - } - /** * Create a JSSE {@link SSLContext}. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index af59733..bab161a 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1313,17 +1313,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return DatabaseDescriptor.getTruncateRpcTimeout(); } - public void setStreamingSocketTimeout(int value) - { - DatabaseDescriptor.setStreamingSocketTimeout(value); - logger.info("set streaming socket timeout to {} ms", value); - } - - public int getStreamingSocketTimeout() - { - return DatabaseDescriptor.getStreamingSocketTimeout(); - } - public void setStreamThroughputMbPerSec(int value) { DatabaseDescriptor.setStreamThroughputOutboundMegabitsPerSec(value); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 36c43fd..46b7253 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -502,9 +502,6 @@ public interface StorageServiceMBean extends NotificationEmitter public void setTruncateRpcTimeout(long value); public long getTruncateRpcTimeout(); - public void setStreamingSocketTimeout(int value); - public int getStreamingSocketTimeout(); - public void setStreamThroughputMbPerSec(int value); public int getStreamThroughputMbPerSec(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java b/src/java/org/apache/cassandra/streaming/ConnectionHandler.java deleted file mode 100644 index 5f734c9..0000000 --- a/src/java/org/apache/cassandra/streaming/ConnectionHandler.java +++ /dev/null @@ -1,428 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.streaming; - -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.net.Socket; -import java.net.SocketException; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.util.Collection; -import java.util.Comparator; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.netty.util.concurrent.FastThreadLocalThread; -import org.apache.cassandra.io.util.DataOutputStreamPlus; -import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; -import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus; -import org.apache.cassandra.net.IncomingStreamingConnection; -import org.apache.cassandra.streaming.messages.StreamInitMessage; -import org.apache.cassandra.streaming.messages.StreamMessage; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.JVMStabilityInspector; - -/** - * ConnectionHandler manages incoming/outgoing message exchange for the {@link StreamSession}. - * - * <p> - * Internally, ConnectionHandler manages thread to receive incoming {@link StreamMessage} and thread to - * send outgoing message. Messages are encoded/decoded on those thread and handed to - * {@link StreamSession#messageReceived(org.apache.cassandra.streaming.messages.StreamMessage)}. - */ -public class ConnectionHandler -{ - private static final Logger logger = LoggerFactory.getLogger(ConnectionHandler.class); - - private final StreamSession session; - - private IncomingMessageHandler incoming; - private OutgoingMessageHandler outgoing; - private final boolean isPreview; - - ConnectionHandler(StreamSession session, int incomingSocketTimeout, boolean isPreview) - { - this.session = session; - this.isPreview = isPreview; - this.incoming = new IncomingMessageHandler(session, incomingSocketTimeout); - this.outgoing = new OutgoingMessageHandler(session); - } - - /** - * Set up incoming message handler and initiate streaming. - * - * This method is called once on initiator. - * - * @throws IOException - */ - @SuppressWarnings("resource") - public void initiate() throws IOException - { - logger.debug("[Stream #{}] Sending stream init for incoming stream", session.planId()); - Socket incomingSocket = session.createConnection(); - incoming.start(incomingSocket, StreamMessage.CURRENT_VERSION, true); - - logger.debug("[Stream #{}] Sending stream init for outgoing stream", session.planId()); - Socket outgoingSocket = session.createConnection(); - outgoing.start(outgoingSocket, StreamMessage.CURRENT_VERSION, true); - } - - /** - * Set up outgoing message handler on receiving side. - * - * @param connection Incoming connection to use for {@link OutgoingMessageHandler}. - * @param version Streaming message version - * @throws IOException - */ - public void initiateOnReceivingSide(IncomingStreamingConnection connection, boolean isForOutgoing, int version) throws IOException - { - if (isForOutgoing) - outgoing.start(connection, version); - else - incoming.start(connection, version); - } - - public ListenableFuture<?> close() - { - logger.debug("[Stream #{}] Closing stream connection handler on {}", session.planId(), session.peer); - - ListenableFuture<?> inClosed = closeIncoming(); - ListenableFuture<?> outClosed = closeOutgoing(); - - return Futures.allAsList(inClosed, outClosed); - } - - public ListenableFuture<?> closeOutgoing() - { - return outgoing == null ? Futures.immediateFuture(null) : outgoing.close(); - } - - public ListenableFuture<?> closeIncoming() - { - return incoming == null ? Futures.immediateFuture(null) : incoming.close(); - } - - /** - * Enqueue messages to be sent. - * - * @param messages messages to send - */ - public void sendMessages(Collection<? extends StreamMessage> messages) - { - for (StreamMessage message : messages) - sendMessage(message); - } - - public void sendMessage(StreamMessage message) - { - if (outgoing.isClosed()) - throw new RuntimeException("Outgoing stream handler has been closed"); - - if (message.type == StreamMessage.Type.FILE && isPreview) - throw new RuntimeException("Cannot send file messages for preview streaming sessions"); - - outgoing.enqueue(message); - } - - /** - * @return true if outgoing connection is opened and ready to send messages - */ - public boolean isOutgoingConnected() - { - return outgoing != null && !outgoing.isClosed(); - } - - abstract static class MessageHandler implements Runnable - { - protected final StreamSession session; - - protected int protocolVersion; - private final boolean isOutgoingHandler; - protected Socket socket; - - private final AtomicReference<SettableFuture<?>> closeFuture = new AtomicReference<>(); - private IncomingStreamingConnection incomingConnection; - - protected MessageHandler(StreamSession session, boolean isOutgoingHandler) - { - this.session = session; - this.isOutgoingHandler = isOutgoingHandler; - } - - protected abstract String name(); - - @SuppressWarnings("resource") - protected static DataOutputStreamPlus getWriteChannel(Socket socket) throws IOException - { - WritableByteChannel out = socket.getChannel(); - // socket channel is null when encrypted(SSL) - if (out == null) - return new WrappedDataOutputStreamPlus(new BufferedOutputStream(socket.getOutputStream())); - return new BufferedDataOutputStreamPlus(out); - } - - protected static ReadableByteChannel getReadChannel(Socket socket) throws IOException - { - //we do this instead of socket.getChannel() so socketSoTimeout is respected - return Channels.newChannel(socket.getInputStream()); - } - - @SuppressWarnings("resource") - private void sendInitMessage() throws IOException - { - StreamInitMessage message = new StreamInitMessage(FBUtilities.getBroadcastAddress(), - session.sessionIndex(), - session.planId(), - session.streamOperation(), - !isOutgoingHandler, - session.keepSSTableLevel(), - session.getPendingRepair(), - session.getPreviewKind()); - ByteBuffer messageBuf = message.createMessage(false, protocolVersion); - DataOutputStreamPlus out = getWriteChannel(socket); - out.write(messageBuf); - out.flush(); - } - - public void start(IncomingStreamingConnection connection, int protocolVersion) throws IOException - { - this.incomingConnection = connection; - start(connection.socket, protocolVersion, false); - } - - public void start(Socket socket, int protocolVersion, boolean initiator) throws IOException - { - this.socket = socket; - this.protocolVersion = protocolVersion; - if (initiator) - sendInitMessage(); - - new FastThreadLocalThread(this, name() + "-" + socket.getRemoteSocketAddress()).start(); - } - - public ListenableFuture<?> close() - { - // Assume it wasn't closed. Not a huge deal if we create a future on a race - SettableFuture<?> future = SettableFuture.create(); - return closeFuture.compareAndSet(null, future) - ? future - : closeFuture.get(); - } - - public boolean isClosed() - { - return closeFuture.get() != null; - } - - protected void signalCloseDone() - { - if (!isClosed()) - close(); - - closeFuture.get().set(null); - - // We can now close the socket - if (incomingConnection != null) - { - //this will close the underlying socket and remove it - //from active MessagingService connections (CASSANDRA-11854) - incomingConnection.close(); - } - else - { - //this is an outgoing connection not registered in the MessagingService - //so we can close the socket directly - try - { - socket.close(); - } - catch (IOException e) - { - // Erroring out while closing shouldn't happen but is not really a big deal, so just log - // it at DEBUG and ignore otherwise. - logger.debug("Unexpected error while closing streaming connection", e); - } - } - } - } - - /** - * Incoming streaming message handler - */ - static class IncomingMessageHandler extends MessageHandler - { - private final int socketTimeout; - - IncomingMessageHandler(StreamSession session, int socketTimeout) - { - super(session, false); - this.socketTimeout = socketTimeout; - } - - @Override - public void start(Socket socket, int version, boolean initiator) throws IOException - { - try - { - socket.setSoTimeout(socketTimeout); - } - catch (SocketException e) - { - logger.warn("Could not set incoming socket timeout to {}", socketTimeout, e); - } - super.start(socket, version, initiator); - } - - protected String name() - { - return "STREAM-IN"; - } - - @SuppressWarnings("resource") - public void run() - { - try - { - ReadableByteChannel in = getReadChannel(socket); - while (!isClosed()) - { - // receive message - StreamMessage message = StreamMessage.deserialize(in, protocolVersion, session); - logger.debug("[Stream #{}] Received {}", session.planId(), message); - // Might be null if there is an error during streaming (see FileMessage.deserialize). It's ok - // to ignore here since we'll have asked for a retry. - if (message != null) - { - session.messageReceived(message); - } - } - } - catch (Throwable t) - { - JVMStabilityInspector.inspectThrowable(t); - session.onError(t); - } - finally - { - signalCloseDone(); - } - } - } - - /** - * Outgoing file transfer thread - */ - static class OutgoingMessageHandler extends MessageHandler - { - /* - * All out going messages are queued up into messageQueue. - * The size will grow when received streaming request. - * - * Queue is also PriorityQueue so that prior messages can go out fast. - */ - private final PriorityBlockingQueue<StreamMessage> messageQueue = new PriorityBlockingQueue<>(64, new Comparator<StreamMessage>() - { - public int compare(StreamMessage o1, StreamMessage o2) - { - return o2.getPriority() - o1.getPriority(); - } - }); - - OutgoingMessageHandler(StreamSession session) - { - super(session, true); - } - - protected String name() - { - return "STREAM-OUT"; - } - - public void enqueue(StreamMessage message) - { - messageQueue.put(message); - } - - @SuppressWarnings("resource") - public void run() - { - try - { - DataOutputStreamPlus out = getWriteChannel(socket); - - StreamMessage next; - while (!isClosed()) - { - if ((next = messageQueue.poll(1, TimeUnit.SECONDS)) != null) - { - logger.debug("[Stream #{}] Sending {}", session.planId(), next); - sendMessage(out, next); - if (next.type == StreamMessage.Type.SESSION_FAILED) - close(); - } - } - - // Sends the last messages on the queue - while ((next = messageQueue.poll()) != null) - sendMessage(out, next); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - catch (Throwable e) - { - session.onError(e); - } - finally - { - signalCloseDone(); - } - } - - private void sendMessage(DataOutputStreamPlus out, StreamMessage message) - { - try - { - StreamMessage.serialize(message, out, protocolVersion, session); - out.flush(); - message.sent(); - } - catch (SocketException e) - { - session.onError(e); - close(); - } - catch (IOException e) - { - session.onError(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java index d88d63c..d9ed8be 100644 --- a/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java +++ b/src/java/org/apache/cassandra/streaming/DefaultConnectionFactory.java @@ -15,83 +15,93 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.cassandra.streaming; import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeUnit; + +import javax.annotation.Nullable; +import com.google.common.util.concurrent.Uninterruptibles; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.Config; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.WriteBufferWaterMark; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.security.SSLFactory; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions; +import org.apache.cassandra.net.async.NettyFactory; +import org.apache.cassandra.net.async.OutboundConnectionIdentifier; +import org.apache.cassandra.net.async.OutboundConnectionParams; public class DefaultConnectionFactory implements StreamConnectionFactory { private static final Logger logger = LoggerFactory.getLogger(DefaultConnectionFactory.class); + private static final int DEFAULT_CHANNEL_BUFFER_SIZE = 1 << 22; + + private static final long MAX_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(30); private static final int MAX_CONNECT_ATTEMPTS = 3; - /** - * Connect to peer and start exchanging message. - * When connect attempt fails, this retries for maximum of MAX_CONNECT_ATTEMPTS times. - * - * @param peer the peer to connect to. - * @return the created socket. - * - * @throws IOException when connection failed. - */ - public Socket createConnection(InetAddress peer) throws IOException + @Override + public Channel createConnection(OutboundConnectionIdentifier connectionId, int protocolVersion) throws IOException { - int attempts = 0; + ServerEncryptionOptions encryptionOptions = DatabaseDescriptor.getServerEncryptionOptions(); + + if (encryptionOptions.internode_encryption == ServerEncryptionOptions.InternodeEncryption.none) + encryptionOptions = null; + + return createConnection(connectionId, protocolVersion, encryptionOptions); + } + + protected Channel createConnection(OutboundConnectionIdentifier connectionId, int protocolVersion, @Nullable ServerEncryptionOptions encryptionOptions) throws IOException + { + // this is the amount of data to allow in memory before netty sets the channel writablility flag to false + int channelBufferSize = DEFAULT_CHANNEL_BUFFER_SIZE; + WriteBufferWaterMark waterMark = new WriteBufferWaterMark(channelBufferSize >> 2, channelBufferSize); + + int sendBufferSize = DatabaseDescriptor.getInternodeSendBufferSize() > 0 + ? DatabaseDescriptor.getInternodeSendBufferSize() + : OutboundConnectionParams.DEFAULT_SEND_BUFFER_SIZE; + + OutboundConnectionParams params = OutboundConnectionParams.builder() + .connectionId(connectionId) + .encryptionOptions(encryptionOptions) + .mode(NettyFactory.Mode.STREAMING) + .protocolVersion(protocolVersion) + .sendBufferSize(sendBufferSize) + .waterMark(waterMark) + .build(); + + Bootstrap bootstrap = NettyFactory.instance.createOutboundBootstrap(params); + + int connectionAttemptCount = 0; + long now = System.nanoTime(); + final long end = now + MAX_WAIT_TIME_NANOS; + final Channel channel; while (true) { - try - { - Socket socket = newSocket(peer); - socket.setSoTimeout(DatabaseDescriptor.getStreamingSocketTimeout()); - socket.setKeepAlive(true); - return socket; - } - catch (IOException e) + ChannelFuture channelFuture = bootstrap.connect(); + channelFuture.awaitUninterruptibly(end - now, TimeUnit.MILLISECONDS); + if (channelFuture.isSuccess()) { - if (++attempts >= MAX_CONNECT_ATTEMPTS) - throw e; - - long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, attempts); - logger.warn("Failed attempt {} to connect to {}. Retrying in {} ms. ({})", attempts, peer, waitms, e.getMessage()); - try - { - Thread.sleep(waitms); - } - catch (InterruptedException wtf) - { - throw new IOException("interrupted", wtf); - } + channel = channelFuture.channel(); + break; } - } - } - // TODO this is deliberately copied from (the now former) OutboundTcpConnectionPool, for CASSANDRA-8457. - // to be replaced in CASSANDRA-12229 (make streaming use 8457) - public static Socket newSocket(InetAddress endpoint) throws IOException - { - // zero means 'bind on any available port.' - if (MessagingService.isEncryptedConnection(endpoint)) - { - return SSLFactory.getSocket(DatabaseDescriptor.getServerEncryptionOptions(), endpoint, DatabaseDescriptor.getSSLStoragePort()); - } - else - { - SocketChannel channel = SocketChannel.open(); - channel.connect(new InetSocketAddress(endpoint, DatabaseDescriptor.getStoragePort())); - return channel.socket(); + connectionAttemptCount++; + now = System.nanoTime(); + if (connectionAttemptCount == MAX_CONNECT_ATTEMPTS || end - now <= 0) + throw new IOException("failed to connect to " + connectionId + " for streaming data", channelFuture.cause()); + + long waitms = DatabaseDescriptor.getRpcTimeout() * (long)Math.pow(2, connectionAttemptCount); + logger.warn("Failed attempt {} to connect to {}. Retrying in {} ms.", connectionAttemptCount, connectionId, waitms); + Uninterruptibles.sleepUninterruptibly(waitms, TimeUnit.MILLISECONDS); } + + return channel; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java b/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java index dd99611..4cfe41e 100644 --- a/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java +++ b/src/java/org/apache/cassandra/streaming/StreamConnectionFactory.java @@ -15,16 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.cassandra.streaming; import java.io.IOException; -import java.net.InetAddress; -import java.net.Socket; -/** - * Interface that creates connection used by streaming. - */ +import io.netty.channel.Channel; +import org.apache.cassandra.net.async.OutboundConnectionIdentifier; + public interface StreamConnectionFactory { - Socket createConnection(InetAddress peer) throws IOException; + Channel createConnection(OutboundConnectionIdentifier connectionId, int protocolVersion) throws IOException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java index 9059f45..bb8c702 100644 --- a/src/java/org/apache/cassandra/streaming/StreamCoordinator.java +++ b/src/java/org/apache/cassandra/streaming/StreamCoordinator.java @@ -37,8 +37,10 @@ public class StreamCoordinator { private static final Logger logger = LoggerFactory.getLogger(StreamCoordinator.class); - // Executor strictly for establishing the initial connections. Once we're connected to the other end the rest of the - // streaming is handled directly by the ConnectionHandler's incoming and outgoing threads. + /** + * Executor strictly for establishing the initial connections. Once we're connected to the other end the rest of the + * streaming is handled directly by the {@link StreamingMessageSender}'s incoming and outgoing threads. + */ private static final DebuggableThreadPoolExecutor streamExecutor = DebuggableThreadPoolExecutor.createWithFixedPoolSize("StreamConnectionEstablisher", FBUtilities.getAvailableProcessors()); private final boolean connectSequentially; @@ -55,8 +57,8 @@ public class StreamCoordinator boolean connectSequentially, UUID pendingRepair, PreviewKind previewKind) { this.connectionsPerHost = connectionsPerHost; - this.factory = factory; this.keepSSTableLevel = keepSSTableLevel; + this.factory = factory; this.connectSequentially = connectSequentially; this.pendingRepair = pendingRepair; this.previewKind = previewKind; @@ -163,6 +165,11 @@ public class StreamCoordinator return getOrCreateHostData(peer).getOrCreateSessionById(peer, id, connecting); } + public StreamSession getSessionById(InetAddress peer, int id) + { + return getHostData(peer).getSessionById(id); + } + public synchronized void updateProgress(ProgressInfo info) { getHostData(info.peer).updateProgress(info); @@ -274,8 +281,8 @@ public class StreamCoordinator private class HostStreamingData { - private Map<Integer, StreamSession> streamSessions = new HashMap<>(); - private Map<Integer, SessionInfo> sessionInfos = new HashMap<>(); + private final Map<Integer, StreamSession> streamSessions = new HashMap<>(); + private final Map<Integer, SessionInfo> sessionInfos = new HashMap<>(); private int lastReturned = -1; @@ -333,6 +340,11 @@ public class StreamCoordinator return session; } + public StreamSession getSessionById(int id) + { + return streamSessions.get(id); + } + public void updateProgress(ProgressInfo info) { sessionInfos.get(info.sessionIndex).updateProgress(info); http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamManager.java b/src/java/org/apache/cassandra/streaming/StreamManager.java index 52652c0..a44f02e 100644 --- a/src/java/org/apache/cassandra/streaming/StreamManager.java +++ b/src/java/org/apache/cassandra/streaming/StreamManager.java @@ -21,7 +21,6 @@ import java.net.InetAddress; import java.util.Map; import java.util.Set; import java.util.UUID; - import javax.management.ListenerNotFoundException; import javax.management.MBeanNotificationInfo; import javax.management.NotificationFilter; @@ -136,7 +135,7 @@ public class StreamManager implements StreamManagerMBean initiatedStreams.put(result.planId, result); } - public void registerReceiving(final StreamResultFuture result) + public StreamResultFuture registerReceiving(final StreamResultFuture result) { result.addEventListener(notifier); // Make sure we remove the stream on completion (whether successful or not) @@ -148,7 +147,8 @@ public class StreamManager implements StreamManagerMBean } }, MoreExecutors.directExecutor()); - receivingStreams.put(result.planId, result); + StreamResultFuture previous = receivingStreams.putIfAbsent(result.planId, result); + return previous == null ? result : previous; } public StreamResultFuture getReceivingStream(UUID planId) @@ -175,4 +175,22 @@ public class StreamManager implements StreamManagerMBean { return notifier.getNotificationInfo(); } + + public StreamSession findSession(InetAddress peer, UUID planId, int sessionIndex) + { + StreamSession session = findSession(initiatedStreams, peer, planId, sessionIndex); + if (session != null) + return session; + + return findSession(receivingStreams, peer, planId, sessionIndex); + } + + private StreamSession findSession(Map<UUID, StreamResultFuture> streams, InetAddress peer, UUID planId, int sessionIndex) + { + StreamResultFuture streamResultFuture = streams.get(planId); + if (streamResultFuture == null) + return null; + + return streamResultFuture.getSession(peer, sessionIndex); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/fc92db2b/src/java/org/apache/cassandra/streaming/StreamPlan.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamPlan.java b/src/java/org/apache/cassandra/streaming/StreamPlan.java index 05a8d30..213f74b 100644 --- a/src/java/org/apache/cassandra/streaming/StreamPlan.java +++ b/src/java/org/apache/cassandra/streaming/StreamPlan.java @@ -33,7 +33,7 @@ import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR */ public class StreamPlan { - public static final String[] EMPTY_COLUMN_FAMILIES = new String[0]; + private static final String[] EMPTY_COLUMN_FAMILIES = new String[0]; private final UUID planId = UUIDGen.getTimeUUID(); private final StreamOperation streamOperation; private final List<StreamEventHandler> handlers = new ArrayList<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org