This is an automated email from the ASF dual-hosted git repository.
djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7369194 Close channel and reduce buffer allocation during entire
sstable streaming with SSL
7369194 is described below
commit 73691944c0ff9b01679cf5a6fe5944ad4c416509
Author: Zhao Yang <[email protected]>
AuthorDate: Wed Jun 24 18:37:47 2020 +0800
Close channel and reduce buffer allocation during entire sstable streaming
with SSL
Patch by Zhao Yang; Reviewed by Caleb Rackliffe and Dinesh Joshi for
CASSANDRA-15900
---
CHANGES.txt | 1 +
.../cassandra/net/AsyncStreamingOutputPlus.java | 66 ++++++++++++++--------
.../net/AsyncStreamingOutputPlusTest.java | 58 +++++++++++++++++++
.../unit/org/apache/cassandra/net/TestChannel.java | 4 +-
4 files changed, 101 insertions(+), 28 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 8fafb7d..c3fdf4f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha5
+ * Close channel and reduce buffer allocation during entire sstable streaming
with SSL (CASSANDRA-15900)
* Prune expired messages less frequently in internode messaging
(CASSANDRA-15700)
* Fix Ec2Snitch handling of legacy mode for dc names matching both formats,
eg "us-west-2" (CASSANDRA-15878)
* Add support for server side DESCRIBE statements (CASSANDRA-14825)
diff --git a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
index e685584..680a9d3 100644
--- a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
@@ -23,11 +23,13 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
+import io.netty.channel.FileRegion;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.handler.ssl.SslHandler;
import org.apache.cassandra.io.compress.BufferType;
@@ -161,51 +163,65 @@ public class AsyncStreamingOutputPlus extends
AsyncChannelOutputPlus
}
/**
+ * Writes all data in file channel to stream: <br>
+ * * For zero-copy-streaming, 1MiB at a time, with at most 2MiB in flight
at once. <br>
+ * * For streaming with SSL, 64kb at a time, with at most 32+64kb (default
low water mark + batch size) in flight. <br>
* <p>
- * Writes all data in file channel to stream, 1MiB at a time, with at most
2MiB in flight at once.
- * This method takes ownership of the provided {@code FileChannel}.
+ * This method takes ownership of the provided {@link FileChannel}.
* <p>
* WARNING: this method blocks only for permission to write to the netty
channel; it exits before
- * the write is flushed to the network.
+ * the {@link FileRegion}(zero-copy) or {@link ByteBuffer}(ssl) is flushed
to the network.
*/
public long writeFileToChannel(FileChannel file, StreamRateLimiter
limiter) throws IOException
{
- // write files in 1MiB chunks, since there may be blocking work
performed to fetch it from disk,
- // the data is never brought in process and is gated by the wire anyway
if (channel.pipeline().get(SslHandler.class) != null)
- return writeFileToChannel(file, limiter, 1 << 20, 1 << 20, 2 <<
20);
+ // each batch is loaded into ByteBuffer, 64kb is more BufferPool
friendly.
+ return writeFileToChannel(file, limiter, 1 << 16);
else
+ // write files in 1MiB chunks, since there may be blocking work
performed to fetch it from disk,
+ // the data is never brought in process and is gated by the wire
anyway
return writeFileToChannelZeroCopy(file, limiter, 1 << 20, 1 << 20,
2 << 20);
}
- public long writeFileToChannel(FileChannel fc, StreamRateLimiter limiter,
int batchSize, int lowWaterMark, int highWaterMark) throws IOException
+ @VisibleForTesting
+ long writeFileToChannel(FileChannel fc, StreamRateLimiter limiter, int
batchSize) throws IOException
{
final long length = fc.size();
long bytesTransferred = 0;
- while (bytesTransferred < length)
+
+ try
+ {
+ while (bytesTransferred < length)
+ {
+ int toWrite = (int) min(batchSize, length - bytesTransferred);
+ final long position = bytesTransferred;
+
+ writeToChannel(bufferSupplier -> {
+ ByteBuffer outBuffer = bufferSupplier.get(toWrite);
+ long read = fc.read(outBuffer, position);
+ if (read != toWrite)
+ throw new IOException(String.format("could not read
required number of bytes from " +
+ "file to be
streamed: read %d bytes, wanted %d bytes",
+ read, toWrite));
+ outBuffer.flip();
+ }, limiter);
+
+ if (logger.isTraceEnabled())
+ logger.trace("Writing {} bytes at position {} of {}",
toWrite, bytesTransferred, length);
+ bytesTransferred += toWrite;
+ }
+ }
+ finally
{
- int toWrite = (int) min(batchSize, length - bytesTransferred);
- final long position = bytesTransferred;
-
- writeToChannel(bufferSupplier -> {
- ByteBuffer outBuffer = bufferSupplier.get(toWrite);
- long read = fc.read(outBuffer, position);
- if (read != toWrite)
- throw new IOException(String.format("could not read
required number of bytes from " +
- "file to be streamed:
read %d bytes, wanted %d bytes",
- read, toWrite));
- outBuffer.flip();
- }, limiter);
-
- if (logger.isTraceEnabled())
- logger.trace("Writing {} bytes at position {} of {}", toWrite,
bytesTransferred, length);
- bytesTransferred += toWrite;
+ // we don't need to wait until byte buffer is flushed by netty
+ fc.close();
}
return bytesTransferred;
}
- public long writeFileToChannelZeroCopy(FileChannel file, StreamRateLimiter
limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException
+ @VisibleForTesting
+ long writeFileToChannelZeroCopy(FileChannel file, StreamRateLimiter
limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException
{
final long length = file.size();
long bytesTransferred = 0;
diff --git
a/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
b/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
index fa5009a..305dc55 100644
--- a/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
+++ b/test/unit/org/apache/cassandra/net/AsyncStreamingOutputPlusTest.java
@@ -18,8 +18,14 @@
package org.apache.cassandra.net;
+import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.util.Random;
+
import org.junit.Test;
import io.netty.buffer.ByteBuf;
@@ -29,6 +35,8 @@ import org.apache.cassandra.streaming.StreamManager;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class AsyncStreamingOutputPlusTest
{
@@ -108,7 +116,57 @@ public class AsyncStreamingOutputPlusTest
assertEquals(1, read.getLong(0));
assertEquals(2, read.getLong(8));
}
+ }
+
+ @Test
+ public void testWriteFileToChannelZeroCopy() throws IOException
+ {
+ testWriteFileToChannel(true);
+ }
+ @Test
+ public void testWriteFileToChannelSSL() throws IOException
+ {
+ testWriteFileToChannel(false);
}
+ private void testWriteFileToChannel(boolean zeroCopy) throws IOException
+ {
+ File file = populateTempData("zero_copy_" + zeroCopy);
+ int length = (int) file.length();
+
+ EmbeddedChannel channel = new TestChannel(4);
+ StreamManager.StreamRateLimiter limiter = new
StreamManager.StreamRateLimiter(FBUtilities.getBroadcastAddressAndPort());
+
+ try (RandomAccessFile raf = new RandomAccessFile(file.getPath(), "r");
+ FileChannel fileChannel = raf.getChannel();
+ AsyncStreamingOutputPlus out = new
AsyncStreamingOutputPlus(channel))
+ {
+ assertTrue(fileChannel.isOpen());
+
+ if (zeroCopy)
+ out.writeFileToChannelZeroCopy(fileChannel, limiter, length,
length, length * 2);
+ else
+ out.writeFileToChannel(fileChannel, limiter, length);
+
+ assertEquals(length, out.flushed());
+ assertEquals(length, out.flushedToNetwork());
+ assertEquals(length, out.position());
+
+ assertFalse(fileChannel.isOpen());
+ }
+ }
+
+ private File populateTempData(String name) throws IOException
+ {
+ File file = Files.createTempFile(name, ".txt").toFile();
+ file.deleteOnExit();
+
+ Random r = new Random();
+ byte [] content = new byte[16];
+ r.nextBytes(content);
+ Files.write(file.toPath(), content);
+
+ return file;
+ }
}
diff --git a/test/unit/org/apache/cassandra/net/TestChannel.java
b/test/unit/org/apache/cassandra/net/TestChannel.java
index feddab0..17da6fa 100644
--- a/test/unit/org/apache/cassandra/net/TestChannel.java
+++ b/test/unit/org/apache/cassandra/net/TestChannel.java
@@ -27,8 +27,6 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.FileRegion;
import io.netty.channel.embedded.EmbeddedChannel;
-import org.apache.cassandra.net.FrameEncoder;
-import org.apache.cassandra.net.GlobalBufferPoolAllocator;
public class TestChannel extends EmbeddedChannel
{
@@ -82,7 +80,7 @@ public class TestChannel extends EmbeddedChannel
return buf.writerIndex();
}
- public boolean isOpen() { return false; }
+ public boolean isOpen() { return true; }
public void close() { }
}, 0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]