This is an automated email from the ASF dual-hosted git repository.
djoshi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new e995c3f Allow sending Entire SSTables over SSL
e995c3f is described below
commit e995c3fcc7479e82f34721d9634093a807ff06bd
Author: Dinesh Joshi <[email protected]>
AuthorDate: Sat Apr 18 19:40:43 2020 -0700
Allow sending Entire SSTables over SSL
Patch by Dinesh Joshi; Reviewed by Joseph Lynch for CASSANDRA-15740
---
.../cassandra/config/DatabaseDescriptor.java | 9 ------
.../cassandra/net/AsyncStreamingOutputPlus.java | 35 ++++++++++++++++++++--
2 files changed, 33 insertions(+), 11 deletions(-)
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 85bfa88..d4ee34b 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -773,15 +773,6 @@ public class DatabaseDescriptor
"server_encryption_options.internode_encryption = " +
conf.server_encryption_options.internode_encryption, false);
}
- if (conf.stream_entire_sstables)
- {
- if (conf.server_encryption_options.enabled ||
conf.server_encryption_options.optional)
- {
- logger.warn("Internode encryption enabled. Disabling zero copy
SSTable transfers for streaming.");
- conf.stream_entire_sstables = false;
- }
- }
-
if (conf.max_value_size_in_mb <= 0)
throw new ConfigurationException("max_value_size_in_mb must be
positive", false);
else if (conf.max_value_size_in_mb >= 2048)
diff --git a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
index a2dff41..e685584 100644
--- a/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
+++ b/src/java/org/apache/cassandra/net/AsyncStreamingOutputPlus.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelPromise;
import io.netty.channel.WriteBufferWaterMark;
+import io.netty.handler.ssl.SslHandler;
import org.apache.cassandra.io.compress.BufferType;
import org.apache.cassandra.io.util.DataOutputStreamPlus;
import org.apache.cassandra.net.SharedDefaultFileRegion.SharedFileChannel;
@@ -171,10 +172,40 @@ public class AsyncStreamingOutputPlus extends
AsyncChannelOutputPlus
{
// write files in 1MiB chunks, since there may be blocking work
performed to fetch it from disk,
// the data is never brought in process and is gated by the wire anyway
- return writeFileToChannel(file, limiter, 1 << 20, 1 << 20, 2 << 20);
+ if (channel.pipeline().get(SslHandler.class) != null)
+ return writeFileToChannel(file, limiter, 1 << 20, 1 << 20, 2 <<
20);
+ else
+ return writeFileToChannelZeroCopy(file, limiter, 1 << 20, 1 << 20,
2 << 20);
}
- public long writeFileToChannel(FileChannel file, StreamRateLimiter
limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException
+ public long writeFileToChannel(FileChannel fc, StreamRateLimiter limiter,
int batchSize, int lowWaterMark, int highWaterMark) throws IOException
+ {
+ final long length = fc.size();
+ long bytesTransferred = 0;
+ while (bytesTransferred < length)
+ {
+ int toWrite = (int) min(batchSize, length - bytesTransferred);
+ final long position = bytesTransferred;
+
+ writeToChannel(bufferSupplier -> {
+ ByteBuffer outBuffer = bufferSupplier.get(toWrite);
+ long read = fc.read(outBuffer, position);
+ if (read != toWrite)
+ throw new IOException(String.format("could not read
required number of bytes from " +
+ "file to be streamed:
read %d bytes, wanted %d bytes",
+ read, toWrite));
+ outBuffer.flip();
+ }, limiter);
+
+ if (logger.isTraceEnabled())
+ logger.trace("Writing {} bytes at position {} of {}", toWrite,
bytesTransferred, length);
+ bytesTransferred += toWrite;
+ }
+
+ return bytesTransferred;
+ }
+
+ public long writeFileToChannelZeroCopy(FileChannel file, StreamRateLimiter
limiter, int batchSize, int lowWaterMark, int highWaterMark) throws IOException
{
final long length = file.size();
long bytesTransferred = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]