This is an automated email from the ASF dual-hosted git repository. djoshi pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new e995c3f Allow sending Entire SSTables over SSL e995c3f is described below commit e995c3fcc7479e82f34721d9634093a807ff06bd Author: Dinesh Joshi <dinesh.jo...@apple.com> AuthorDate: Sat Apr 18 19:40:43 2020 -0700 Allow sending Entire SSTables over SSL Patch by Dinesh Joshi; Reviewed by Joseph Lynch for CASSANDRA-15740 --- .../cassandra/config/DatabaseDescriptor.java | 9 ------ .../cassandra/net/AsyncStreamingOutputPlus.java | 35 ++++++++++++++++++++-- 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 85bfa88..d4ee34b 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -773,15 +773,6 @@ public class DatabaseDescriptor "server_encryption_options.internode_encryption = " + conf.server_encryption_options.internode_encryption, false); } - if (conf.stream_entire_sstables) - { - if (conf.server_encryption_options.enabled || conf.server_encryption_options.optional) - { - logger.warn("Internode encryption enabled. Disabling zero copy SSTable transfers for streaming."); - conf.stream_entire_sstables = false; - } - } - if (conf.max_value_size_in_mb <= 0) throw new ConfigurationException("max_value_size_in_mb must be positive", false); else if (conf.max_value_size_in_mb >= 2048) diff --git a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java index a2dff41..e685584 100644 --- a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java +++ b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import io.netty.channel.Channel; import io.netty.channel.ChannelPromise; import io.netty.channel.WriteBufferWaterMark; +import io.netty.handler.ssl.SslHandler; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.util.DataOutputStreamPlus; import org.apache.cassandra.net.SharedDefaultFileRegion.SharedFileChannel; @@ -171,10 +172,40 @@ public class AsyncStreamingOutputPlus extends AsyncChannelOutputPlus { // write files in 1MiB chunks, since there may be blocking work performed to fetch it from disk, // the data is never brought in process and is gated by the wire anyway - return writeFileToChannel(file, limiter, 1 << 20, 1 << 20, 2 << 20); + if (channel.pipeline().get(SslHandler.class) != null) + return writeFileToChannel(file, limiter, 1 << 20, 1 << 20, 2 << 20); + else + return writeFileToChannelZeroCopy(file, limiter, 1 << 20, 1 << 20, 2 << 20); } - public long writeFileToChannel(FileChannel file, StreamRateLimiter limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException + public long writeFileToChannel(FileChannel fc, StreamRateLimiter limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException + { + final long length = fc.size(); + long bytesTransferred = 0; + while (bytesTransferred < length) + { + int toWrite = (int) min(batchSize, length - bytesTransferred); + final long position = bytesTransferred; + + writeToChannel(bufferSupplier -> { + ByteBuffer outBuffer = bufferSupplier.get(toWrite); + long read = fc.read(outBuffer, position); + if (read != toWrite) + throw new IOException(String.format("could not read required number of bytes from " + + "file to be streamed: read %d bytes, wanted %d bytes", + read, toWrite)); + outBuffer.flip(); + }, limiter); + + if (logger.isTraceEnabled()) + logger.trace("Writing {} bytes at position {} of {}", toWrite, bytesTransferred, length); + bytesTransferred += toWrite; + } + + return bytesTransferred; + } + + public long writeFileToChannelZeroCopy(FileChannel file, StreamRateLimiter limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException { final long length = file.size(); long bytesTransferred = 0; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org