Stream entire SSTables when possible patch by Dinesh Joshi; reviewed by Aleksey Yeschenko and Ariel Weisberg for CASSANDRA-14566
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/47a12c52 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/47a12c52 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/47a12c52 Branch: refs/heads/trunk Commit: 47a12c52a313258307ab88392f75c5866d9a2bb1 Parents: 6ba2fb9 Author: Dinesh A. Joshi <[email protected]> Authored: Tue Jul 3 12:07:11 2018 -0700 Committer: Aleksey Yeshchenko <[email protected]> Committed: Fri Jul 27 17:50:25 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 3 + conf/cassandra.yaml | 12 + .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 14 + .../apache/cassandra/db/ColumnFamilyStore.java | 2 +- .../org/apache/cassandra/db/DiskBoundaries.java | 17 +- .../cassandra/db/compaction/Verifier.java | 26 +- .../apache/cassandra/db/lifecycle/LogFile.java | 2 +- .../cassandra/db/lifecycle/LogReplicaSet.java | 3 +- .../CassandraCompressedStreamReader.java | 131 ++++++++ .../CassandraCompressedStreamWriter.java | 152 +++++++++ .../CassandraEntireSSTableStreamReader.java | 177 ++++++++++ .../CassandraEntireSSTableStreamWriter.java | 120 +++++++ .../db/streaming/CassandraIncomingFile.java | 17 +- .../db/streaming/CassandraOutgoingFile.java | 132 +++++++- .../db/streaming/CassandraStreamHeader.java | 250 ++++++++++++-- .../db/streaming/CassandraStreamManager.java | 4 +- .../db/streaming/CassandraStreamReader.java | 5 +- .../db/streaming/ComponentManifest.java | 130 ++++++++ .../CompressedCassandraStreamReader.java | 131 -------- .../CompressedCassandraStreamWriter.java | 152 --------- .../db/streaming/CompressedInputStream.java | 4 +- .../cassandra/db/streaming/IStreamReader.java | 32 ++ .../apache/cassandra/io/sstable/Component.java | 7 +- .../cassandra/io/sstable/SSTableLoader.java | 2 +- .../format/big/BigTableZeroCopyWriter.java | 226 +++++++++++++ .../io/util/BufferedDataOutputStreamPlus.java | 4 +- .../cassandra/io/util/CheckedFunction.java | 25 ++ .../cassandra/io/util/DataOutputPlus.java | 4 +- .../io/util/RebufferingInputStream.java | 4 +- .../cassandra/io/util/SequentialWriter.java | 17 +- .../io/util/UnbufferedDataOutputStreamPlus.java | 2 +- .../net/async/ByteBufDataOutputPlus.java | 5 +- .../net/async/ByteBufDataOutputStreamPlus.java | 60 +++- .../net/async/NonClosingDefaultFileRegion.java | 51 +++ .../async/RebufferingByteBufDataInputPlus.java | 39 +++ .../cassandra/streaming/StreamCoordinator.java | 3 +- .../cassandra/streaming/StreamReceiveTask.java | 5 +- .../cassandra/streaming/StreamResultFuture.java | 13 +- .../cassandra/streaming/StreamSession.java | 6 +- .../async/NettyStreamingMessageSender.java | 2 +- .../async/StreamingInboundHandler.java | 2 +- .../streaming/messages/StreamInitMessage.java | 7 +- .../org/apache/cassandra/utils/Collectors3.java | 54 +++ test/conf/cassandra.yaml | 2 + .../cassandra/streaming/LongStreamingTest.java | 6 +- .../microbench/ZeroCopyStreamingBenchmark.java | 329 +++++++++++++++++++ .../org/apache/cassandra/db/VerifyTest.java | 30 +- .../CassandraEntireSSTableStreamWriterTest.java | 209 ++++++++++++ .../db/streaming/CassandraOutgoingFileTest.java | 145 ++++++++ .../db/streaming/CassandraStreamHeaderTest.java | 62 +++- .../db/streaming/ComponentManifestTest.java | 64 ++++ .../io/sstable/BigTableWriterTest.java | 5 +- .../cassandra/io/sstable/LegacySSTableTest.java | 2 + .../format/big/BigTableZeroCopyWriterTest.java | 208 ++++++++++++ .../RebufferingByteBufDataInputPlusTest.java | 98 ++++++ .../serializers/SerializationUtils.java | 1 - .../streaming/StreamTransferTaskTest.java | 4 +- .../streaming/StreamingTransferTest.java | 1 + 60 files changed, 2809 insertions(+), 413 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6b6418c..6ede70e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Stream entire SSTables when possible (CASSANDRA-14556) * Add experimental support for Java 11 (CASSANDRA-9608) * Make PeriodicCommitLogService.blockWhenSyncLagsNanos configurable (CASSANDRA-14580) * Improve logging in MessageInHandler's constructor (CASSANDRA-14576) http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 75885e9..3fab849 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -91,6 +91,9 @@ New features statements after the cache has been refreshed. CASSANDRA-13985 - Support for audit logging of database activity. If enabled, logs every incoming CQL command request, Authentication (successful as well as unsuccessful login) to a node. + - Faster streaming of entire SSTables using ZeroCopy APIs. If enabled, Cassandra will use stream + entire SSTables, significantly speeding up transfers. Any streaming related operations will see + corresponding improvement. See CASSANDRA-14556. Upgrading --------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 439b85a..663daaa 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -788,6 +788,18 @@ compaction_throughput_mb_per_sec: 16 # between the sstables, reducing page cache churn and keeping hot rows hot sstable_preemptive_open_interval_in_mb: 50 +# When enabled, permits Cassandra to zero-copy stream entire eligible +# SSTables between nodes, including every component. +# This speeds up the network transfer significantly subject to +# throttling specified by stream_throughput_outbound_megabits_per_sec. +# Enabling this will reduce the GC pressure on sending and receiving node. +# When unset, the default is enabled. While this feature tries to keep the +# disks balanced, it cannot guarantee it. This feature will be automatically +# disabled if internode encryption is enabled. Currently this can be used with +# Leveled Compaction. Once CASSANDRA-14586 is fixed other compaction strategies +# will benefit as well when used in combination with CASSANDRA-6696. +# stream_entire_sstables: true + # Throttles all outbound streaming file transfers on this node to the # given total throughput in Mbps. This is necessary because Cassandra does # mostly sequential IO when streaming data during bootstrap or repair, which http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 0d4760e..3a7ff0d 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -381,6 +381,7 @@ public class Config public int block_for_peers_timeout_in_secs = 10; public volatile boolean automatic_sstable_upgrade = false; public volatile int max_concurrent_automatic_sstable_upgrades = 1; + public boolean stream_entire_sstables = true; public volatile AuditLogOptions audit_logging_options = new AuditLogOptions(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 6301ab0..366dac7 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -714,6 +714,15 @@ public class DatabaseDescriptor "server_encryption_options.internode_encryption = " + conf.server_encryption_options.internode_encryption, false); } + if (conf.stream_entire_sstables) + { + if (conf.server_encryption_options.enabled || conf.server_encryption_options.optional) + { + logger.warn("Internode encryption enabled. Disabling zero copy SSTable transfers for streaming."); + conf.stream_entire_sstables = false; + } + } + if (conf.max_value_size_in_mb <= 0) throw new ConfigurationException("max_value_size_in_mb must be positive", false); else if (conf.max_value_size_in_mb >= 2048) @@ -2274,6 +2283,11 @@ public class DatabaseDescriptor return conf.streaming_connections_per_host; } + public static boolean streamEntireSSTables() + { + return conf.stream_entire_sstables; + } + public static String getLocalDataCenter() { return localDC; http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 9c4921e..f03ffe6 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -791,7 +791,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return newSSTableDescriptor(directory, format.info.getLatestVersion(), format); } - private Descriptor newSSTableDescriptor(File directory, Version version, SSTableFormat.Type format) + public Descriptor newSSTableDescriptor(File directory, Version version, SSTableFormat.Type format) { return new Descriptor(version, directory, http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/DiskBoundaries.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java index 086bc84..22f17b0 100644 --- a/src/java/org/apache/cassandra/db/DiskBoundaries.java +++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java @@ -129,4 +129,19 @@ public class DiskBoundaries { return directories.get(getDiskIndex(sstable)); } -} \ No newline at end of file + + public Directories.DataDirectory getCorrectDiskForKey(DecoratedKey key) + { + if (positions == null) + return null; + + return directories.get(getDiskIndex(key)); + } + + private int getDiskIndex(DecoratedKey key) + { + int pos = Collections.binarySearch(positions, key); + assert pos < 0; + return -pos - 1; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/compaction/Verifier.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Verifier.java b/src/java/org/apache/cassandra/db/compaction/Verifier.java index bc9679d..db49369 100644 --- a/src/java/org/apache/cassandra/db/compaction/Verifier.java +++ b/src/java/org/apache/cassandra/db/compaction/Verifier.java @@ -180,7 +180,7 @@ public class Verifier implements Closeable while (iter.hasNext()) { DecoratedKey key = iter.next(); - rangeOwnHelper.check(key); + rangeOwnHelper.validate(key); } } catch (Throwable t) @@ -262,7 +262,7 @@ public class Verifier implements Closeable { try { - rangeOwnHelper.check(key); + rangeOwnHelper.validate(key); } catch (Throwable t) { @@ -360,13 +360,27 @@ public class Verifier implements Closeable * @param key the key * @throws RuntimeException if the key is not contained */ - public void check(DecoratedKey key) + public void validate(DecoratedKey key) + { + if (!check(key)) + throw new RuntimeException("Key " + key + " is not contained in the given ranges"); + } + + /** + * check if the given key is contained in any of the given ranges + * + * Must be called in sorted order - key should be increasing + * + * @param key the key + * @return boolean + */ + public boolean check(DecoratedKey key) { assert lastKey == null || key.compareTo(lastKey) > 0; lastKey = key; if (normalizedRanges.isEmpty()) // handle tests etc where we don't have any ranges - return; + return true; if (rangeIndex > normalizedRanges.size() - 1) throw new IllegalStateException("RangeOwnHelper can only be used to find the first out-of-range-token"); @@ -375,8 +389,10 @@ public class Verifier implements Closeable { rangeIndex++; if (rangeIndex > normalizedRanges.size() - 1) - throw new RuntimeException("Key "+key+" is not contained in the given ranges"); + return false; } + + return true; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/lifecycle/LogFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java index af6f435..98be0a0 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogFile.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogFile.java @@ -66,7 +66,7 @@ final class LogFile implements AutoCloseable private final LogReplicaSet replicas = new LogReplicaSet(); // The transaction records, this set must be ORDER PRESERVING - private final LinkedHashSet<LogRecord> records = new LinkedHashSet<>(); + private final Set<LogRecord> records = Collections.synchronizedSet(new LinkedHashSet<>()); // TODO: Hack until we fix CASSANDRA-14554 // The type of the transaction private final OperationType type; http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java index 65be285..f5423d6 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java +++ b/src/java/org/apache/cassandra/db/lifecycle/LogReplicaSet.java @@ -20,6 +20,7 @@ package org.apache.cassandra.db.lifecycle; import java.io.File; import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -45,7 +46,7 @@ public class LogReplicaSet implements AutoCloseable { private static final Logger logger = LoggerFactory.getLogger(LogReplicaSet.class); - private final Map<File, LogReplica> replicasByFile = new LinkedHashMap<>(); + private final Map<File, LogReplica> replicasByFile = Collections.synchronizedMap(new LinkedHashMap<>()); // TODO: Hack until we fix CASSANDRA-14554 private Collection<LogReplica> replicas() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java new file mode 100644 index 0000000..eb993ff --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamReader.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.streaming; + +import java.io.IOException; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.TrackedDataInputPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; +import org.apache.cassandra.utils.ChecksumType; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; + +/** + * CassandraStreamReader that reads from streamed compressed SSTable + */ +public class CassandraCompressedStreamReader extends CassandraStreamReader +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraCompressedStreamReader.class); + + protected final CompressionInfo compressionInfo; + + public CassandraCompressedStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) + { + super(header, streamHeader, session); + this.compressionInfo = streamHeader.compressionInfo; + } + + /** + * @return SSTable transferred + * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ + @Override + @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed + public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException + { + long totalSize = totalSize(); + + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + + if (cfs == null) + { + // schema was dropped during streaming + throw new IOException("CF " + tableId + " was dropped during streaming"); + } + + logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.", + session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), pendingRepair, + cfs.getTableName()); + + StreamDeserializer deserializer = null; + SSTableMultiWriter writer = null; + try (CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance)) + { + TrackedDataInputPlus in = new TrackedDataInputPlus(cis); + deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); + writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); + String filename = writer.getFilename(); + int sectionIdx = 0; + for (SSTableReader.PartitionPositionBounds section : sections) + { + assert cis.getTotalCompressedBytesRead() <= totalSize; + long sectionLength = section.upperPosition - section.lowerPosition; + + logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength); + // skip to beginning of section inside chunk + cis.position(section.lowerPosition); + in.reset(0); + + while (in.getBytesRead() < sectionLength) + { + writePartition(deserializer, writer); + // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred + session.progress(filename, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); + } + } + logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum, + session.peer, FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), FBUtilities.prettyPrintMemory(totalSize)); + return writer; + } + catch (Throwable e) + { + Object partitionKey = deserializer != null ? deserializer.partitionKey() : ""; + logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", + session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName()); + if (writer != null) + { + writer.abort(e); + } + if (extractIOExceptionCause(e).isPresent()) + throw e; + throw Throwables.propagate(e); + } + } + + @Override + protected long totalSize() + { + long size = 0; + // calculate total length of transferring chunks + for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) + size += chunk.length + 4; // 4 bytes for CRC + return size; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java new file mode 100644 index 0000000..3b971f8 --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CassandraCompressedStreamWriter.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.utils.FBUtilities; + +/** + * CassandraStreamWriter for compressed SSTable. + */ +public class CassandraCompressedStreamWriter extends CassandraStreamWriter +{ + private static final int CHUNK_SIZE = 1 << 16; + + private static final Logger logger = LoggerFactory.getLogger(CassandraCompressedStreamWriter.class); + + private final CompressionInfo compressionInfo; + + public CassandraCompressedStreamWriter(SSTableReader sstable, Collection<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, StreamSession session) + { + super(sstable, sections, session); + this.compressionInfo = compressionInfo; + } + + @Override + public void write(DataOutputStreamPlus out) throws IOException + { + assert out instanceof ByteBufDataOutputStreamPlus; + ByteBufDataOutputStreamPlus output = (ByteBufDataOutputStreamPlus)out; + long totalSize = totalSize(); + logger.debug("[Stream #{}] Start streaming file {} to {}, repairedAt = {}, totalSize = {}", session.planId(), + sstable.getFilename(), session.peer, sstable.getSSTableMetadata().repairedAt, totalSize); + try (ChannelProxy fc = sstable.getDataChannel().sharedCopy()) + { + long progress = 0L; + // calculate chunks to transfer. we want to send continuous chunks altogether. + List<SSTableReader.PartitionPositionBounds> sections = getTransferSections(compressionInfo.chunks); + + int sectionIdx = 0; + + // stream each of the required sections of the file + for (final SSTableReader.PartitionPositionBounds section : sections) + { + // length of the section to stream + long length = section.upperPosition - section.lowerPosition; + + logger.trace("[Stream #{}] Writing section {} with length {} to stream.", session.planId(), sectionIdx++, length); + + // tracks write progress + long bytesTransferred = 0; + while (bytesTransferred < length) + { + final int toTransfer = (int) Math.min(CHUNK_SIZE, length - bytesTransferred); + limiter.acquire(toTransfer); + + ByteBuffer outBuffer = ByteBuffer.allocateDirect(toTransfer); + long lastWrite; + try + { + lastWrite = fc.read(outBuffer, section.lowerPosition + bytesTransferred); + assert lastWrite == toTransfer : String.format("could not read required number of bytes from file to be streamed: read %d bytes, wanted %d bytes", lastWrite, toTransfer); + outBuffer.flip(); + output.writeToChannel(outBuffer); + } + catch (IOException e) + { + FileUtils.clean(outBuffer); + throw e; + } + + bytesTransferred += lastWrite; + progress += lastWrite; + session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); + } + } + logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", + session.planId(), sstable.getFilename(), session.peer, FBUtilities.prettyPrintMemory(progress), FBUtilities.prettyPrintMemory(totalSize)); + } + } + + @Override + protected long totalSize() + { + long size = 0; + // calculate total length of transferring chunks + for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) + size += chunk.length + 4; // 4 bytes for CRC + return size; + } + + // chunks are assumed to be sorted by offset + private List<SSTableReader.PartitionPositionBounds> getTransferSections(CompressionMetadata.Chunk[] chunks) + { + List<SSTableReader.PartitionPositionBounds> transferSections = new ArrayList<>(); + SSTableReader.PartitionPositionBounds lastSection = null; + for (CompressionMetadata.Chunk chunk : chunks) + { + if (lastSection != null) + { + if (chunk.offset == lastSection.upperPosition) + { + // extend previous section to end of this chunk + lastSection = new SSTableReader.PartitionPositionBounds(lastSection.lowerPosition, chunk.offset + chunk.length + 4); // 4 bytes for CRC + } + else + { + transferSections.add(lastSection); + lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4); + } + } + else + { + lastSection = new SSTableReader.PartitionPositionBounds(chunk.offset, chunk.offset + chunk.length + 4); + } + } + if (lastSection != null) + transferSections.add(lastSection); + return transferSections; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java new file mode 100644 index 0000000..6f8f06a --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamReader.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import com.google.common.base.Throwables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Directories; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.big.BigTableZeroCopyWriter; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamReceiver; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessageHeader; + +import static java.lang.String.format; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraEntireSSTableStreamReader reads SSTable off the wire and writes it to disk. + */ +public class CassandraEntireSSTableStreamReader implements IStreamReader +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraEntireSSTableStreamReader.class); + + private final TableId tableId; + private final StreamSession session; + private final CassandraStreamHeader header; + private final int fileSequenceNumber; + + public CassandraEntireSSTableStreamReader(StreamMessageHeader messageHeader, CassandraStreamHeader streamHeader, StreamSession session) + { + if (streamHeader.format != SSTableFormat.Type.BIG) + throw new AssertionError("Unsupported SSTable format " + streamHeader.format); + + if (session.getPendingRepair() != null) + { + // we should only ever be streaming pending repair sstables if the session has a pending repair id + if (!session.getPendingRepair().equals(messageHeader.pendingRepair)) + throw new IllegalStateException(format("Stream Session & SSTable (%s) pendingRepair UUID mismatch.", messageHeader.tableId)); + } + + this.header = streamHeader; + this.session = session; + this.tableId = messageHeader.tableId; + this.fileSequenceNumber = messageHeader.sequenceNumber; + } + + /** + * @param in where this reads data from + * @return SSTable transferred + * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. + */ + @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed + @Override + public SSTableMultiWriter read(DataInputPlus in) throws IOException + { + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + if (cfs == null) + { + // schema was dropped during streaming + throw new IOException("Table " + tableId + " was dropped during streaming"); + } + + ComponentManifest manifest = header.componentManifest; + long totalSize = manifest.totalSize(); + + logger.debug("[Stream #{}] Started receiving sstable #{} from {}, size = {}, table = {}", + session.planId(), + fileSequenceNumber, + session.peer, + prettyPrintMemory(totalSize), + cfs.metadata()); + + BigTableZeroCopyWriter writer = null; + + try + { + writer = createWriter(cfs, totalSize, manifest.components()); + long bytesRead = 0; + for (Component component : manifest.components()) + { + long length = manifest.sizeOf(component); + + logger.debug("[Stream #{}] Started receiving {} component from {}, componentSize = {}, readBytes = {}, totalSize = {}", + session.planId(), + component, + session.peer, + prettyPrintMemory(length), + prettyPrintMemory(bytesRead), + prettyPrintMemory(totalSize)); + + writer.writeComponent(component.type, in, length); + session.progress(writer.descriptor.filenameFor(component), ProgressInfo.Direction.IN, length, length); + bytesRead += length; + + logger.debug("[Stream #{}] Finished receiving {} component from {}, componentSize = {}, readBytes = {}, totalSize = {}", + session.planId(), + component, + session.peer, + prettyPrintMemory(length), + prettyPrintMemory(bytesRead), + prettyPrintMemory(totalSize)); + } + + writer.descriptor.getMetadataSerializer().mutateLevel(writer.descriptor, header.sstableLevel); + return writer; + } + catch (Throwable e) + { + logger.error("[Stream {}] Error while reading sstable from stream for table = {}", session.planId(), cfs.metadata(), e); + if (writer != null) + e = writer.abort(e); + Throwables.throwIfUnchecked(e); + throw new RuntimeException(e); + } + } + + private File getDataDir(ColumnFamilyStore cfs, long totalSize) throws IOException + { + Directories.DataDirectory localDir = cfs.getDirectories().getWriteableLocation(totalSize); + if (localDir == null) + throw new IOException(format("Insufficient disk space to store %s", prettyPrintMemory(totalSize))); + + File dir = cfs.getDirectories().getLocationForDisk(cfs.getDiskBoundaries().getCorrectDiskForKey(header.firstKey)); + + if (dir == null) + return cfs.getDirectories().getDirectoryForNewSSTables(); + + return dir; + } + + @SuppressWarnings("resource") + protected BigTableZeroCopyWriter createWriter(ColumnFamilyStore cfs, long totalSize, Collection<Component> components) throws IOException + { + File dataDir = getDataDir(cfs, totalSize); + + StreamReceiver streamReceiver = session.getAggregator(tableId); + assert streamReceiver instanceof CassandraStreamReceiver; + + LifecycleTransaction txn = CassandraStreamReceiver.fromReceiver(session.getAggregator(tableId)).getTransaction(); + + Descriptor desc = cfs.newSSTableDescriptor(dataDir, header.version, header.format); + + logger.debug("[Table #{}] {} Components to write: {}", cfs.metadata(), desc.filenameFor(Component.DATA), components); + + return new BigTableZeroCopyWriter(desc, cfs.metadata, txn, components); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java new file mode 100644 index 0000000..7a20110 --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/CassandraEntireSSTableStreamWriter.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; +import org.apache.cassandra.streaming.ProgressInfo; +import org.apache.cassandra.streaming.StreamManager; +import org.apache.cassandra.streaming.StreamSession; + +import static org.apache.cassandra.streaming.StreamManager.StreamRateLimiter; +import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; + +/** + * CassandraEntireSSTableStreamWriter streams the entire SSTable to given channel. + */ +public class CassandraEntireSSTableStreamWriter +{ + private static final Logger logger = LoggerFactory.getLogger(CassandraEntireSSTableStreamWriter.class); + + private final SSTableReader sstable; + private final ComponentManifest manifest; + private final StreamSession session; + private final StreamRateLimiter limiter; + + public CassandraEntireSSTableStreamWriter(SSTableReader sstable, StreamSession session, ComponentManifest manifest) + { + this.session = session; + this.sstable = sstable; + this.manifest = manifest; + this.limiter = StreamManager.getRateLimiter(session.peer); + } + + /** + * Stream the entire file to given channel. + * <p> + * + * @param out where this writes data to + * @throws IOException on any I/O error + */ + public void write(ByteBufDataOutputStreamPlus out) throws IOException + { + long totalSize = manifest.totalSize(); + logger.debug("[Stream #{}] Start streaming sstable {} to {}, repairedAt = {}, totalSize = {}", + session.planId(), + sstable.getFilename(), + session.peer, + sstable.getSSTableMetadata().repairedAt, + prettyPrintMemory(totalSize)); + + long progress = 0L; + + for (Component component : manifest.components()) + { + @SuppressWarnings("resource") // this is closed after the file is transferred by ByteBufDataOutputStreamPlus + FileChannel in = new RandomAccessFile(sstable.descriptor.filenameFor(component), "r").getChannel(); + + // Total Length to transmit for this file + long length = in.size(); + + // tracks write progress + logger.debug("[Stream #{}] Streaming {}.{} gen {} component {} size {}", session.planId(), + sstable.getKeyspaceName(), + sstable.getColumnFamilyName(), + sstable.descriptor.generation, + component, + prettyPrintMemory(length)); + + long bytesWritten = out.writeToChannel(in, limiter); + progress += bytesWritten; + + session.progress(sstable.descriptor.filenameFor(component), ProgressInfo.Direction.OUT, bytesWritten, length); + + logger.debug("[Stream #{}] Finished streaming {}.{} gen {} component {} to {}, xfered = {}, length = {}, totalSize = {}", + session.planId(), + sstable.getKeyspaceName(), + sstable.getColumnFamilyName(), + sstable.descriptor.generation, + component, + session.peer, + prettyPrintMemory(bytesWritten), + prettyPrintMemory(length), + prettyPrintMemory(totalSize)); + } + + out.flush(); + + logger.debug("[Stream #{}] Finished streaming sstable {} to {}, xfered = {}, totalSize = {}", + session.planId(), + sstable.getFilename(), + session.peer, + prettyPrintMemory(progress), + prettyPrintMemory(totalSize)); + + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java index 16698e5..c65ca62 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraIncomingFile.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.util.Objects; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.io.sstable.SSTableMultiWriter; @@ -45,6 +47,8 @@ public class CassandraIncomingFile implements IncomingStream private volatile SSTableMultiWriter sstable; private volatile long size = -1; + private static final Logger logger = LoggerFactory.getLogger(CassandraIncomingFile.class); + public CassandraIncomingFile(ColumnFamilyStore cfs, StreamSession session, StreamMessageHeader header) { this.cfs = cfs; @@ -56,9 +60,16 @@ public class CassandraIncomingFile implements IncomingStream public synchronized void read(DataInputPlus in, int version) throws IOException { CassandraStreamHeader streamHeader = CassandraStreamHeader.serializer.deserialize(in, version); - CassandraStreamReader reader = !streamHeader.isCompressed() - ? new CassandraStreamReader(header, streamHeader, session) - : new CompressedCassandraStreamReader(header, streamHeader, session); + logger.debug("Incoming stream entireSSTable={} components={}", streamHeader.isEntireSSTable, streamHeader.componentManifest); + + IStreamReader reader; + if (streamHeader.isEntireSSTable) + reader = new CassandraEntireSSTableStreamReader(header, streamHeader, session); + else if (streamHeader.isCompressed()) + reader = new CassandraCompressedStreamReader(header, streamHeader, session); + else + reader = new CassandraStreamReader(header, streamHeader, session); + size = streamHeader.size(); sstable = reader.read(in); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java index 6ec1f85..5252187 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraOutgoingFile.java @@ -18,51 +18,100 @@ package org.apache.cassandra.db.streaming; +import java.io.File; import java.io.IOException; +import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; import java.util.Objects; import java.util.UUID; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; +import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.KeyIterator; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.DataOutputStreamPlus; +import org.apache.cassandra.net.async.ByteBufDataOutputStreamPlus; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.streaming.OutgoingStream; import org.apache.cassandra.streaming.StreamOperation; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.utils.concurrent.Ref; +import static org.apache.cassandra.db.compaction.Verifier.RangeOwnHelper; + /** * used to transfer the part(or whole) of a SSTable data file */ public class CassandraOutgoingFile implements OutgoingStream { + public static final List<Component> STREAM_COMPONENTS = ImmutableList.of(Component.DATA, Component.PRIMARY_INDEX, Component.STATS, + Component.COMPRESSION_INFO, Component.FILTER, Component.SUMMARY, + Component.DIGEST, Component.CRC); + private final Ref<SSTableReader> ref; private final long estimatedKeys; private final List<SSTableReader.PartitionPositionBounds> sections; private final String filename; private final CassandraStreamHeader header; private final boolean keepSSTableLevel; + private final ComponentManifest manifest; + private Boolean isFullyContained; - public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref, List<SSTableReader.PartitionPositionBounds> sections, long estimatedKeys) + private final List<Range<Token>> ranges; + + public CassandraOutgoingFile(StreamOperation operation, Ref<SSTableReader> ref, + List<SSTableReader.PartitionPositionBounds> sections, Collection<Range<Token>> ranges, + long estimatedKeys) { Preconditions.checkNotNull(ref.get()); this.ref = ref; this.estimatedKeys = estimatedKeys; this.sections = sections; + this.ranges = ImmutableList.copyOf(ranges); this.filename = ref.get().getFilename(); + this.manifest = getComponentManifest(ref.get()); SSTableReader sstable = ref.get(); keepSSTableLevel = operation == StreamOperation.BOOTSTRAP || operation == StreamOperation.REBUILD; - this.header = new CassandraStreamHeader(sstable.descriptor.version, - sstable.descriptor.formatType, - estimatedKeys, - sections, - sstable.compression ? sstable.getCompressionMetadata() : null, - keepSSTableLevel ? sstable.getSSTableLevel() : 0, - sstable.header.toComponent()); + this.header = + CassandraStreamHeader.builder() + .withSSTableFormat(sstable.descriptor.formatType) + .withSSTableVersion(sstable.descriptor.version) + .withSSTableLevel(keepSSTableLevel ? sstable.getSSTableLevel() : 0) + .withEstimatedKeys(estimatedKeys) + .withSections(sections) + .withCompressionMetadata(sstable.compression ? sstable.getCompressionMetadata() : null) + .withSerializationHeader(sstable.header.toComponent()) + .isEntireSSTable(shouldStreamEntireSSTable()) + .withComponentManifest(manifest) + .withFirstKey(sstable.first) + .withTableId(sstable.metadata().id) + .build(); + } + + @VisibleForTesting + public static ComponentManifest getComponentManifest(SSTableReader sstable) + { + LinkedHashMap<Component, Long> components = new LinkedHashMap<>(STREAM_COMPONENTS.size()); + for (Component component : STREAM_COMPONENTS) + { + File file = new File(sstable.descriptor.filenameFor(component)); + if (file.exists()) + components.put(component, file.length()); + } + + return new ComponentManifest(components); } public static CassandraOutgoingFile fromStream(OutgoingStream stream) @@ -114,11 +163,68 @@ public class CassandraOutgoingFile implements OutgoingStream CassandraStreamHeader.serializer.serialize(header, out, version); out.flush(); - CassandraStreamWriter writer = header.compressionInfo == null ? - new CassandraStreamWriter(sstable, header.sections, session) : - new CompressedCassandraStreamWriter(sstable, header.sections, - header.compressionInfo, session); - writer.write(out); + if (shouldStreamEntireSSTable() && out instanceof ByteBufDataOutputStreamPlus) + { + CassandraEntireSSTableStreamWriter writer = new CassandraEntireSSTableStreamWriter(sstable, session, manifest); + writer.write((ByteBufDataOutputStreamPlus) out); + } + else + { + CassandraStreamWriter writer = (header.compressionInfo == null) ? + new CassandraStreamWriter(sstable, header.sections, session) : + new CassandraCompressedStreamWriter(sstable, header.sections, + header.compressionInfo, session); + writer.write(out); + } + } + + @VisibleForTesting + public boolean shouldStreamEntireSSTable() + { + // don't stream if full sstable transfers are disabled or legacy counter shards are present + if (!DatabaseDescriptor.streamEntireSSTables() || ref.get().getSSTableMetadata().hasLegacyCounterShards) + return false; + + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(getTableId()); + + if (cfs == null) + return false; + + AbstractCompactionStrategy compactionStrategy = cfs.getCompactionStrategyManager() + .getCompactionStrategyFor(ref.get()); + + if (compactionStrategy instanceof LeveledCompactionStrategy) + return contained(ranges, ref.get()); + + return false; + } + + @VisibleForTesting + public boolean contained(List<Range<Token>> normalizedRanges, SSTableReader sstable) + { + if (isFullyContained != null) + return isFullyContained; + + isFullyContained = computeContainment(normalizedRanges, sstable); + return isFullyContained; + } + + private boolean computeContainment(List<Range<Token>> normalizedRanges, SSTableReader sstable) + { + if (normalizedRanges == null) + return false; + + RangeOwnHelper rangeOwnHelper = new RangeOwnHelper(normalizedRanges); + try (KeyIterator iter = new KeyIterator(sstable.descriptor, sstable.metadata())) + { + while (iter.hasNext()) + { + DecoratedKey key = iter.next(); + if (!rangeOwnHelper.check(key)) + return false; + } + } + return true; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java index 43631b0..2af56de 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamHeader.java @@ -15,16 +15,22 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.cassandra.db.streaming; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.function.Function; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.SerializationHeader; import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.compress.CompressionMetadata; import org.apache.cassandra.io.sstable.format.SSTableFormat; @@ -32,6 +38,10 @@ import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.Version; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static com.google.common.base.Preconditions.checkNotNull; public class CassandraStreamHeader { @@ -50,33 +60,38 @@ public class CassandraStreamHeader private final CompressionMetadata compressionMetadata; public volatile CompressionInfo compressionInfo; public final int sstableLevel; - public final SerializationHeader.Component header; + public final SerializationHeader.Component serializationHeader; + + /* flag indicating whether this is a partial or entire sstable transfer */ + public final boolean isEntireSSTable; + /* first token of the sstable required for faster streaming */ + public final DecoratedKey firstKey; + public final TableId tableId; + public final ComponentManifest componentManifest; /* cached size value */ private transient final long size; - private CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, CompressionMetadata compressionMetadata, CompressionInfo compressionInfo, int sstableLevel, SerializationHeader.Component header) - { - this.version = version; - this.format = format; - this.estimatedKeys = estimatedKeys; - this.sections = sections; - this.compressionMetadata = compressionMetadata; - this.compressionInfo = compressionInfo; - this.sstableLevel = sstableLevel; - this.header = header; - - this.size = calculateSize(); - } - - public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, CompressionMetadata compressionMetadata, int sstableLevel, SerializationHeader.Component header) + private CassandraStreamHeader(Builder builder) { - this(version, format, estimatedKeys, sections, compressionMetadata, null, sstableLevel, header); + version = builder.version; + format = builder.format; + estimatedKeys = builder.estimatedKeys; + sections = builder.sections; + compressionMetadata = builder.compressionMetadata; + compressionInfo = builder.compressionInfo; + sstableLevel = builder.sstableLevel; + serializationHeader = builder.serializationHeader; + tableId = builder.tableId; + isEntireSSTable = builder.isEntireSSTable; + componentManifest = builder.componentManifest; + firstKey = builder.firstKey; + size = calculateSize(); } - public CassandraStreamHeader(Version version, SSTableFormat.Type format, long estimatedKeys, List<SSTableReader.PartitionPositionBounds> sections, CompressionInfo compressionInfo, int sstableLevel, SerializationHeader.Component header) + public static Builder builder() { - this(version, format, estimatedKeys, sections, null, compressionInfo, sstableLevel, header); + return new Builder(); } public boolean isCompressed() @@ -94,6 +109,9 @@ public class CassandraStreamHeader private long calculateSize() { + if (isEntireSSTable) + return componentManifest.totalSize(); + long transferSize = 0; if (compressionInfo != null) { @@ -112,9 +130,7 @@ public class CassandraStreamHeader public synchronized void calculateCompressionInfo() { if (compressionMetadata != null && compressionInfo == null) - { compressionInfo = CompressionInfo.fromCompressionMetadata(compressionMetadata, sections); - } } @Override @@ -125,9 +141,11 @@ public class CassandraStreamHeader ", format=" + format + ", estimatedKeys=" + estimatedKeys + ", sections=" + sections + - ", compressionInfo=" + compressionInfo + ", sstableLevel=" + sstableLevel + - ", header=" + header + + ", header=" + serializationHeader + + ", isEntireSSTable=" + isEntireSSTable + + ", firstKey=" + firstKey + + ", tableId=" + tableId + '}'; } @@ -138,20 +156,26 @@ public class CassandraStreamHeader CassandraStreamHeader that = (CassandraStreamHeader) o; return estimatedKeys == that.estimatedKeys && sstableLevel == that.sstableLevel && + isEntireSSTable == that.isEntireSSTable && Objects.equals(version, that.version) && format == that.format && Objects.equals(sections, that.sections) && Objects.equals(compressionInfo, that.compressionInfo) && - Objects.equals(header, that.header); + Objects.equals(serializationHeader, that.serializationHeader) && + Objects.equals(componentManifest, that.componentManifest) && + Objects.equals(firstKey, that.firstKey) && + Objects.equals(tableId, that.tableId); } public int hashCode() { - return Objects.hash(version, format, estimatedKeys, sections, compressionInfo, sstableLevel, header); + return Objects.hash(version, format, estimatedKeys, sections, compressionInfo, sstableLevel, serializationHeader, componentManifest, + isEntireSSTable, firstKey, tableId); } + public static final IVersionedSerializer<CassandraStreamHeader> serializer = new CassandraStreamHeaderSerializer(); - public static final IVersionedSerializer<CassandraStreamHeader> serializer = new IVersionedSerializer<CassandraStreamHeader>() + public static class CassandraStreamHeaderSerializer implements IVersionedSerializer<CassandraStreamHeader> { public void serialize(CassandraStreamHeader header, DataOutputPlus out, int version) throws IOException { @@ -168,11 +192,33 @@ public class CassandraStreamHeader header.calculateCompressionInfo(); CompressionInfo.serializer.serialize(header.compressionInfo, out, version); out.writeInt(header.sstableLevel); - SerializationHeader.serializer.serialize(header.version, header.header, out); + + SerializationHeader.serializer.serialize(header.version, header.serializationHeader, out); + + header.tableId.serialize(out); + out.writeBoolean(header.isEntireSSTable); + + if (header.isEntireSSTable) + { + ComponentManifest.serializer.serialize(header.componentManifest, out, version); + ByteBufferUtil.writeWithVIntLength(header.firstKey.getKey(), out); + } } public CassandraStreamHeader deserialize(DataInputPlus in, int version) throws IOException { + return deserialize(in, version, tableId -> { + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); + if (cfs != null) + return cfs.getPartitioner(); + + return null; + }); + } + + @VisibleForTesting + public CassandraStreamHeader deserialize(DataInputPlus in, int version, Function<TableId, IPartitioner> partitionerMapper) throws IOException + { Version sstableVersion = SSTableFormat.Type.current().info.getVersion(in.readUTF()); SSTableFormat.Type format = SSTableFormat.Type.validate(in.readUTF()); @@ -183,9 +229,36 @@ public class CassandraStreamHeader sections.add(new SSTableReader.PartitionPositionBounds(in.readLong(), in.readLong())); CompressionInfo compressionInfo = CompressionInfo.serializer.deserialize(in, version); int sstableLevel = in.readInt(); + SerializationHeader.Component header = SerializationHeader.serializer.deserialize(sstableVersion, in); - return new CassandraStreamHeader(sstableVersion, format, estimatedKeys, sections, compressionInfo, sstableLevel, header); + TableId tableId = TableId.deserialize(in); + boolean isEntireSSTable = in.readBoolean(); + ComponentManifest manifest = null; + DecoratedKey firstKey = null; + + if (isEntireSSTable) + { + manifest = ComponentManifest.serializer.deserialize(in, version); + ByteBuffer keyBuf = ByteBufferUtil.readWithVIntLength(in); + IPartitioner partitioner = partitionerMapper.apply(tableId); + if (partitioner == null) + throw new IllegalArgumentException(String.format("Could not determine partitioner for tableId %s", tableId)); + firstKey = partitioner.decorateKey(keyBuf); + } + + return builder().withSSTableFormat(format) + .withSSTableVersion(sstableVersion) + .withSSTableLevel(sstableLevel) + .withEstimatedKeys(estimatedKeys) + .withSections(sections) + .withCompressionInfo(compressionInfo) + .withSerializationHeader(header) + .withComponentManifest(manifest) + .isEntireSSTable(isEntireSSTable) + .withFirstKey(firstKey) + .withTableId(tableId) + .build(); } public long serializedSize(CassandraStreamHeader header, int version) @@ -201,12 +274,127 @@ public class CassandraStreamHeader size += TypeSizes.sizeof(section.lowerPosition); size += TypeSizes.sizeof(section.upperPosition); } + + header.calculateCompressionInfo(); size += CompressionInfo.serializer.serializedSize(header.compressionInfo, version); size += TypeSizes.sizeof(header.sstableLevel); - size += SerializationHeader.serializer.serializedSize(header.version, header.header); + size += SerializationHeader.serializer.serializedSize(header.version, header.serializationHeader); + + size += header.tableId.serializedSize(); + size += TypeSizes.sizeof(header.isEntireSSTable); + if (header.isEntireSSTable) + { + size += ComponentManifest.serializer.serializedSize(header.componentManifest, version); + size += ByteBufferUtil.serializedSizeWithVIntLength(header.firstKey.getKey()); + } return size; } - }; + } + + public static final class Builder + { + private Version version; + private SSTableFormat.Type format; + private long estimatedKeys; + private List<SSTableReader.PartitionPositionBounds> sections; + private CompressionMetadata compressionMetadata; + private CompressionInfo compressionInfo; + private int sstableLevel; + private SerializationHeader.Component serializationHeader; + private ComponentManifest componentManifest; + private boolean isEntireSSTable; + private DecoratedKey firstKey; + private TableId tableId; + + public Builder withSSTableFormat(SSTableFormat.Type format) + { + this.format = format; + return this; + } + + public Builder withSSTableVersion(Version version) + { + this.version = version; + return this; + } + + public Builder withSSTableLevel(int sstableLevel) + { + this.sstableLevel = sstableLevel; + return this; + } + + public Builder withEstimatedKeys(long estimatedKeys) + { + this.estimatedKeys = estimatedKeys; + return this; + } + + public Builder withSections(List<SSTableReader.PartitionPositionBounds> sections) + { + this.sections = sections; + return this; + } + + public Builder withCompressionMetadata(CompressionMetadata compressionMetadata) + { + this.compressionMetadata = compressionMetadata; + return this; + } + + public Builder withCompressionInfo(CompressionInfo compressionInfo) + { + this.compressionInfo = compressionInfo; + return this; + } + + public Builder withSerializationHeader(SerializationHeader.Component header) + { + this.serializationHeader = header; + return this; + } + + public Builder withTableId(TableId tableId) + { + this.tableId = tableId; + return this; + } + + public Builder isEntireSSTable(boolean isEntireSSTable) + { + this.isEntireSSTable = isEntireSSTable; + return this; + } + + public Builder withComponentManifest(ComponentManifest componentManifest) + { + this.componentManifest = componentManifest; + return this; + } + + public Builder withFirstKey(DecoratedKey firstKey) + { + this.firstKey = firstKey; + return this; + } + + public CassandraStreamHeader build() + { + checkNotNull(version); + checkNotNull(format); + checkNotNull(sections); + checkNotNull(serializationHeader); + checkNotNull(tableId); + + if (isEntireSSTable) + { + checkNotNull(componentManifest); + checkNotNull(firstKey); + } + + return new CassandraStreamHeader(this); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java index 673b62c..43667d0 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java @@ -47,7 +47,6 @@ import org.apache.cassandra.streaming.StreamReceiver; import org.apache.cassandra.streaming.StreamSession; import org.apache.cassandra.streaming.TableStreamManager; import org.apache.cassandra.streaming.messages.StreamMessageHeader; -import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Ref; import org.apache.cassandra.utils.concurrent.Refs; @@ -152,7 +151,8 @@ public class CassandraStreamManager implements TableStreamManager ref.release(); continue; } - streams.add(new CassandraOutgoingFile(session.getStreamOperation(), ref, sections, sstable.estimatedKeysForRanges(ranges))); + streams.add(new CassandraOutgoingFile(session.getStreamOperation(), ref, sections, ranges, + sstable.estimatedKeysForRanges(ranges))); } return streams; http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java index 3930196..fccabfe 100644 --- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java +++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java @@ -53,7 +53,7 @@ import org.apache.cassandra.utils.FBUtilities; /** * CassandraStreamReader reads from stream and writes to SSTable. */ -public class CassandraStreamReader +public class CassandraStreamReader implements IStreamReader { private static final Logger logger = LoggerFactory.getLogger(CassandraStreamReader.class); protected final TableId tableId; @@ -85,7 +85,7 @@ public class CassandraStreamReader this.pendingRepair = header.pendingRepair; this.format = streamHeader.format; this.sstableLevel = streamHeader.sstableLevel; - this.header = streamHeader.header; + this.header = streamHeader.serializationHeader; this.fileSeqNum = header.sequenceNumber; } @@ -95,6 +95,7 @@ public class CassandraStreamReader * @throws IOException if reading the remote sstable fails. Will throw an RTE if local write fails. */ @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed + @Override public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException { long totalSize = totalSize(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java new file mode 100644 index 0000000..90e3dbd --- /dev/null +++ b/src/java/org/apache/cassandra/db/streaming/ComponentManifest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import com.google.common.collect.Iterators; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; + +public final class ComponentManifest implements Iterable<Component> +{ + private final LinkedHashMap<Component, Long> components; + + public ComponentManifest(Map<Component, Long> components) + { + this.components = new LinkedHashMap<>(components); + } + + public long sizeOf(Component component) + { + Long size = components.get(component); + if (size == null) + throw new IllegalArgumentException("Component " + component + " is not present in the manifest"); + return size; + } + + public long totalSize() + { + long totalSize = 0; + for (Long size : components.values()) + totalSize += size; + return totalSize; + } + + public List<Component> components() + { + return new ArrayList<>(components.keySet()); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof ComponentManifest)) + return false; + + ComponentManifest that = (ComponentManifest) o; + return components.equals(that.components); + } + + @Override + public int hashCode() + { + return components.hashCode(); + } + + public static final IVersionedSerializer<ComponentManifest> serializer = new IVersionedSerializer<ComponentManifest>() + { + public void serialize(ComponentManifest manifest, DataOutputPlus out, int version) throws IOException + { + out.writeUnsignedVInt(manifest.components.size()); + for (Map.Entry<Component, Long> entry : manifest.components.entrySet()) + { + out.writeUTF(entry.getKey().name); + out.writeUnsignedVInt(entry.getValue()); + } + } + + public ComponentManifest deserialize(DataInputPlus in, int version) throws IOException + { + int size = (int) in.readUnsignedVInt(); + + LinkedHashMap<Component, Long> components = new LinkedHashMap<>(size); + + for (int i = 0; i < size; i++) + { + Component component = Component.parse(in.readUTF()); + long length = in.readUnsignedVInt(); + components.put(component, length); + } + + return new ComponentManifest(components); + } + + public long serializedSize(ComponentManifest manifest, int version) + { + long size = TypeSizes.sizeofUnsignedVInt(manifest.components.size()); + for (Map.Entry<Component, Long> entry : manifest.components.entrySet()) + { + size += TypeSizes.sizeof(entry.getKey().name); + size += TypeSizes.sizeofUnsignedVInt(entry.getValue()); + } + return size; + } + }; + + @Override + public Iterator<Component> iterator() + { + return Iterators.unmodifiableIterator(components.keySet().iterator()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/47a12c52/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java b/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java deleted file mode 100644 index c71edfb..0000000 --- a/src/java/org/apache/cassandra/db/streaming/CompressedCassandraStreamReader.java +++ /dev/null @@ -1,131 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.streaming; - -import java.io.IOException; - -import com.google.common.base.Throwables; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.io.compress.CompressionMetadata; -import org.apache.cassandra.io.sstable.SSTableMultiWriter; -import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.TrackedDataInputPlus; -import org.apache.cassandra.streaming.ProgressInfo; -import org.apache.cassandra.streaming.StreamSession; -import org.apache.cassandra.streaming.messages.StreamMessageHeader; -import org.apache.cassandra.utils.ChecksumType; -import org.apache.cassandra.utils.FBUtilities; - -import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause; - -/** - * CassandraStreamReader that reads from streamed compressed SSTable - */ -public class CompressedCassandraStreamReader extends CassandraStreamReader -{ - private static final Logger logger = LoggerFactory.getLogger(CompressedCassandraStreamReader.class); - - protected final CompressionInfo compressionInfo; - - public CompressedCassandraStreamReader(StreamMessageHeader header, CassandraStreamHeader streamHeader, StreamSession session) - { - super(header, streamHeader, session); - this.compressionInfo = streamHeader.compressionInfo; - } - - /** - * @return SSTable transferred - * @throws java.io.IOException if reading the remote sstable fails. Will throw an RTE if local write fails. - */ - @Override - @SuppressWarnings("resource") // input needs to remain open, streams on top of it can't be closed - public SSTableMultiWriter read(DataInputPlus inputPlus) throws IOException - { - long totalSize = totalSize(); - - ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(tableId); - - if (cfs == null) - { - // schema was dropped during streaming - throw new IOException("CF " + tableId + " was dropped during streaming"); - } - - logger.debug("[Stream #{}] Start receiving file #{} from {}, repairedAt = {}, size = {}, ks = '{}', pendingRepair = '{}', table = '{}'.", - session.planId(), fileSeqNum, session.peer, repairedAt, totalSize, cfs.keyspace.getName(), pendingRepair, - cfs.getTableName()); - - StreamDeserializer deserializer = null; - SSTableMultiWriter writer = null; - try (CompressedInputStream cis = new CompressedInputStream(inputPlus, compressionInfo, ChecksumType.CRC32, cfs::getCrcCheckChance)) - { - TrackedDataInputPlus in = new TrackedDataInputPlus(cis); - deserializer = new StreamDeserializer(cfs.metadata(), in, inputVersion, getHeader(cfs.metadata())); - writer = createWriter(cfs, totalSize, repairedAt, pendingRepair, format); - String filename = writer.getFilename(); - int sectionIdx = 0; - for (SSTableReader.PartitionPositionBounds section : sections) - { - assert cis.getTotalCompressedBytesRead() <= totalSize; - long sectionLength = section.upperPosition - section.lowerPosition; - - logger.trace("[Stream #{}] Reading section {} with length {} from stream.", session.planId(), sectionIdx++, sectionLength); - // skip to beginning of section inside chunk - cis.position(section.lowerPosition); - in.reset(0); - - while (in.getBytesRead() < sectionLength) - { - writePartition(deserializer, writer); - // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred - session.progress(filename, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); - } - } - logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum, - session.peer, FBUtilities.prettyPrintMemory(cis.getTotalCompressedBytesRead()), FBUtilities.prettyPrintMemory(totalSize)); - return writer; - } - catch (Throwable e) - { - Object partitionKey = deserializer != null ? deserializer.partitionKey() : ""; - logger.warn("[Stream {}] Error while reading partition {} from stream on ks='{}' and table='{}'.", - session.planId(), partitionKey, cfs.keyspace.getName(), cfs.getTableName()); - if (writer != null) - { - writer.abort(e); - } - if (extractIOExceptionCause(e).isPresent()) - throw e; - throw Throwables.propagate(e); - } - } - - @Override - protected long totalSize() - { - long size = 0; - // calculate total length of transferring chunks - for (CompressionMetadata.Chunk chunk : compressionInfo.chunks) - size += chunk.length + 4; // 4 bytes for CRC - return size; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
