This is an automated email from the ASF dual-hosted git repository.

djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7369194  Close channel and reduce buffer allocation during entire 
sstable streaming with SSL
7369194 is described below

commit 73691944c0ff9b01679cf5a6fe5944ad4c416509
Author: Zhao Yang <[email protected]>
AuthorDate: Wed Jun 24 18:37:47 2020 +0800

    Close channel and reduce buffer allocation during entire sstable streaming 
with SSL
    
    Patch by Zhao Yang; Reviewed by Caleb Rackliffe and Dinesh Joshi for 
CASSANDRA-15900
---
 CHANGES.txt                                        |  1 +
 .../cassandra/net/AsyncStreamingOutputPlus.java    | 66 ++++++++++++++--------
 .../net/AsyncStreamingOutputPlusTest.java          | 58 +++++++++++++++++++
 .../unit/org/apache/cassandra/net/TestChannel.java |  4 +-
 4 files changed, 101 insertions(+), 28 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 8fafb7d..c3fdf4f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha5
+ * Close channel and reduce buffer allocation during entire sstable streaming 
with SSL (CASSANDRA-15900)
  * Prune expired messages less frequently in internode messaging 
(CASSANDRA-15700)
  * Fix Ec2Snitch handling of legacy mode for dc names matching both formats, 
eg "us-west-2" (CASSANDRA-15878)
  * Add support for server side DESCRIBE statements (CASSANDRA-14825)
diff --git a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java 
b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
index e685584..680a9d3 100644
--- a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
@@ -23,11 +23,13 @@ import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPromise;
+import io.netty.channel.FileRegion;
 import io.netty.channel.WriteBufferWaterMark;
 import io.netty.handler.ssl.SslHandler;
 import org.apache.cassandra.io.compress.BufferType;
@@ -161,51 +163,65 @@ public class AsyncStreamingOutputPlus extends 
AsyncChannelOutputPlus
     }
 
     /**
+     * Writes all data in file channel to stream: <br>
+     * * For zero-copy-streaming, 1MiB at a time, with at most 2MiB in flight 
at once. <br>
+     * * For streaming with SSL, 64kb at a time, with at most 32+64kb (default 
low water mark + batch size) in flight. <br>
      * <p>
-     * Writes all data in file channel to stream, 1MiB at a time, with at most 
2MiB in flight at once.
-     * This method takes ownership of the provided {@code FileChannel}.
+     * This method takes ownership of the provided {@link FileChannel}.
      * <p>
      * WARNING: this method blocks only for permission to write to the netty 
channel; it exits before
-     * the write is flushed to the network.
+     * the {@link FileRegion}(zero-copy) or {@link ByteBuffer}(ssl) is flushed 
to the network.
      */
     public long writeFileToChannel(FileChannel file, StreamRateLimiter 
limiter) throws IOException
     {
-        // write files in 1MiB chunks, since there may be blocking work 
performed to fetch it from disk,
-        // the data is never brought in process and is gated by the wire anyway
         if (channel.pipeline().get(SslHandler.class) != null)
-            return writeFileToChannel(file, limiter, 1 << 20, 1 << 20, 2 << 
20);
+            // each batch is loaded into ByteBuffer, 64kb is more BufferPool 
friendly.
+            return writeFileToChannel(file, limiter, 1 << 16);
         else
+            // write files in 1MiB chunks, since there may be blocking work 
performed to fetch it from disk,
+            // the data is never brought in process and is gated by the wire 
anyway
             return writeFileToChannelZeroCopy(file, limiter, 1 << 20, 1 << 20, 
2 << 20);
     }
 
-    public long writeFileToChannel(FileChannel fc, StreamRateLimiter limiter, 
int batchSize, int lowWaterMark, int highWaterMark) throws IOException
+    @VisibleForTesting
+    long writeFileToChannel(FileChannel fc, StreamRateLimiter limiter, int 
batchSize) throws IOException
     {
         final long length = fc.size();
         long bytesTransferred = 0;
-        while (bytesTransferred < length)
+
+        try
+        {
+            while (bytesTransferred < length)
+            {
+                int toWrite = (int) min(batchSize, length - bytesTransferred);
+                final long position = bytesTransferred;
+
+                writeToChannel(bufferSupplier -> {
+                    ByteBuffer outBuffer = bufferSupplier.get(toWrite);
+                    long read = fc.read(outBuffer, position);
+                    if (read != toWrite)
+                        throw new IOException(String.format("could not read 
required number of bytes from " +
+                                                            "file to be 
streamed: read %d bytes, wanted %d bytes",
+                                                            read, toWrite));
+                    outBuffer.flip();
+                }, limiter);
+
+                if (logger.isTraceEnabled())
+                    logger.trace("Writing {} bytes at position {} of {}", 
toWrite, bytesTransferred, length);
+                bytesTransferred += toWrite;
+            }
+        }
+        finally
         {
-            int toWrite = (int) min(batchSize, length - bytesTransferred);
-            final long position = bytesTransferred;
-
-            writeToChannel(bufferSupplier -> {
-                ByteBuffer outBuffer = bufferSupplier.get(toWrite);
-                long read = fc.read(outBuffer, position);
-                if (read != toWrite)
-                    throw new IOException(String.format("could not read 
required number of bytes from " +
-                                                        "file to be streamed: 
read %d bytes, wanted %d bytes",
-                                                        read, toWrite));
-                outBuffer.flip();
-            }, limiter);
-
-            if (logger.isTraceEnabled())
-                logger.trace("Writing {} bytes at position {} of {}", toWrite, 
bytesTransferred, length);
-            bytesTransferred += toWrite;
+            // we don't need to wait until byte buffer is flushed by netty
+            fc.close();
         }
 
         return bytesTransferred;
     }
 
-    public long writeFileToChannelZeroCopy(FileChannel file, StreamRateLimiter 
limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException
+    @VisibleForTesting
+    long writeFileToChannelZeroCopy(FileChannel file, StreamRateLimiter 
limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException
     {
         final long length = file.size();
         long bytesTransferred = 0;
diff --git 
a/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java 
b/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
index fa5009a..305dc55 100644
--- a/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
+++ b/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
@@ -18,8 +18,14 @@
 
 package org.apache.cassandra.net;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.util.Random;
+
 import org.junit.Test;
 
 import io.netty.buffer.ByteBuf;
@@ -29,6 +35,8 @@ import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class AsyncStreamingOutputPlusTest
 {
@@ -108,7 +116,57 @@ public class AsyncStreamingOutputPlusTest
             assertEquals(1, read.getLong(0));
             assertEquals(2, read.getLong(8));
         }
+    }
+
+    @Test
+    public void testWriteFileToChannelZeroCopy() throws IOException
+    {
+        testWriteFileToChannel(true);
+    }
 
+    @Test
+    public void testWriteFileToChannelSSL() throws IOException
+    {
+        testWriteFileToChannel(false);
     }
 
+    private void testWriteFileToChannel(boolean zeroCopy) throws IOException
+    {
+        File file = populateTempData("zero_copy_" + zeroCopy);
+        int length = (int) file.length();
+
+        EmbeddedChannel channel = new TestChannel(4);
+        StreamManager.StreamRateLimiter limiter = new 
StreamManager.StreamRateLimiter(FBUtilities.getBroadcastAddressAndPort());
+
+        try (RandomAccessFile raf = new RandomAccessFile(file.getPath(), "r");
+             FileChannel fileChannel = raf.getChannel();
+             AsyncStreamingOutputPlus out = new 
AsyncStreamingOutputPlus(channel))
+        {
+            assertTrue(fileChannel.isOpen());
+
+            if (zeroCopy)
+                out.writeFileToChannelZeroCopy(fileChannel, limiter, length, 
length, length * 2);
+            else
+                out.writeFileToChannel(fileChannel, limiter, length);
+
+            assertEquals(length, out.flushed());
+            assertEquals(length, out.flushedToNetwork());
+            assertEquals(length, out.position());
+
+            assertFalse(fileChannel.isOpen());
+        }
+    }
+
+    private File populateTempData(String name) throws IOException
+    {
+        File file = Files.createTempFile(name, ".txt").toFile();
+        file.deleteOnExit();
+
+        Random r = new Random();
+        byte [] content = new byte[16];
+        r.nextBytes(content);
+        Files.write(file.toPath(), content);
+
+        return file;
+    }
 }
diff --git a/test/unit/org/apache/cassandra/net/TestChannel.java 
b/test/unit/org/apache/cassandra/net/TestChannel.java
index feddab0..17da6fa 100644
--- a/test/unit/org/apache/cassandra/net/TestChannel.java
+++ b/test/unit/org/apache/cassandra/net/TestChannel.java
@@ -27,8 +27,6 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelOutboundBuffer;
 import io.netty.channel.FileRegion;
 import io.netty.channel.embedded.EmbeddedChannel;
-import org.apache.cassandra.net.FrameEncoder;
-import org.apache.cassandra.net.GlobalBufferPoolAllocator;
 
 public class TestChannel extends EmbeddedChannel
 {
@@ -82,7 +80,7 @@ public class TestChannel extends EmbeddedChannel
                             return buf.writerIndex();
                         }
 
-                        public boolean isOpen() { return false; }
+                        public boolean isOpen() { return true; }
 
                         public void close() { }
                     }, 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to