This is an automated email from the ASF dual-hosted git repository.

djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e995c3f  Allow sending Entire SSTables over SSL
e995c3f is described below

commit e995c3fcc7479e82f34721d9634093a807ff06bd
Author: Dinesh Joshi <dinesh.jo...@apple.com>
AuthorDate: Sat Apr 18 19:40:43 2020 -0700

    Allow sending Entire SSTables over SSL
    
    Patch by Dinesh Joshi; Reviewed by Joseph Lynch for CASSANDRA-15740
---
 .../cassandra/config/DatabaseDescriptor.java       |  9 ------
 .../cassandra/net/AsyncStreamingOutputPlus.java    | 35 ++++++++++++++++++++--
 2 files changed, 33 insertions(+), 11 deletions(-)

diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 85bfa88..d4ee34b 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -773,15 +773,6 @@ public class DatabaseDescriptor
                                             
"server_encryption_options.internode_encryption = " + 
conf.server_encryption_options.internode_encryption, false);
         }
 
-        if (conf.stream_entire_sstables)
-        {
-            if (conf.server_encryption_options.enabled || 
conf.server_encryption_options.optional)
-            {
-                logger.warn("Internode encryption enabled. Disabling zero copy 
SSTable transfers for streaming.");
-                conf.stream_entire_sstables = false;
-            }
-        }
-
         if (conf.max_value_size_in_mb <= 0)
             throw new ConfigurationException("max_value_size_in_mb must be 
positive", false);
         else if (conf.max_value_size_in_mb >= 2048)
diff --git a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java 
b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
index a2dff41..e685584 100644
--- a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.WriteBufferWaterMark;
+import io.netty.handler.ssl.SslHandler;
 import org.apache.cassandra.io.compress.BufferType;
 import org.apache.cassandra.io.util.DataOutputStreamPlus;
 import org.apache.cassandra.net.SharedDefaultFileRegion.SharedFileChannel;
@@ -171,10 +172,40 @@ public class AsyncStreamingOutputPlus extends 
AsyncChannelOutputPlus
     {
         // write files in 1MiB chunks, since there may be blocking work 
performed to fetch it from disk,
         // the data is never brought in process and is gated by the wire anyway
-        return writeFileToChannel(file, limiter, 1 << 20, 1 << 20, 2 << 20);
+        if (channel.pipeline().get(SslHandler.class) != null)
+            return writeFileToChannel(file, limiter, 1 << 20, 1 << 20, 2 << 
20);
+        else
+            return writeFileToChannelZeroCopy(file, limiter, 1 << 20, 1 << 20, 
2 << 20);
     }
 
-    public long writeFileToChannel(FileChannel file, StreamRateLimiter 
limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException
+    public long writeFileToChannel(FileChannel fc, StreamRateLimiter limiter, 
int batchSize, int lowWaterMark, int highWaterMark) throws IOException
+    {
+        final long length = fc.size();
+        long bytesTransferred = 0;
+        while (bytesTransferred < length)
+        {
+            int toWrite = (int) min(batchSize, length - bytesTransferred);
+            final long position = bytesTransferred;
+
+            writeToChannel(bufferSupplier -> {
+                ByteBuffer outBuffer = bufferSupplier.get(toWrite);
+                long read = fc.read(outBuffer, position);
+                if (read != toWrite)
+                    throw new IOException(String.format("could not read 
required number of bytes from " +
+                                                        "file to be streamed: 
read %d bytes, wanted %d bytes",
+                                                        read, toWrite));
+                outBuffer.flip();
+            }, limiter);
+
+            if (logger.isTraceEnabled())
+                logger.trace("Writing {} bytes at position {} of {}", toWrite, 
bytesTransferred, length);
+            bytesTransferred += toWrite;
+        }
+
+        return bytesTransferred;
+    }
+
+    public long writeFileToChannelZeroCopy(FileChannel file, StreamRateLimiter 
limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException
     {
         final long length = file.size();
         long bytesTransferred = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to