Merge branch 'cassandra-3.0' into cassandra-3.3
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7feb80d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7feb80d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7feb80d Branch: refs/heads/trunk Commit: a7feb80d6bd90a8f8041741e561282e6888a78f2 Parents: cf3dcc2 0f995a2 Author: Yuki Morishita <yu...@apache.org> Authored: Wed Jan 13 13:16:24 2016 -0600 Committer: Yuki Morishita <yu...@apache.org> Committed: Wed Jan 13 13:16:24 2016 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/streaming/ConnectionHandler.java | 2 +- .../cassandra/streaming/StreamReader.java | 19 ++++++-- .../cassandra/streaming/StreamWriter.java | 10 ++++- .../compress/CompressedInputStream.java | 41 ++++++----------- .../compress/CompressedStreamReader.java | 26 +++++++---- .../compress/CompressedStreamWriter.java | 15 +++++++ .../compression/CompressedInputStreamTest.java | 46 -------------------- 8 files changed, 73 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index fd05c01,614d5b4..aae5efe --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -7,28 -4,6 +7,29 @@@ Merged from 3.0 tombstone (CASSANDRA-10743) * MV should use the maximum timestamp of the primary key (CASSANDRA-10910) * Fix potential assertion error during compaction (CASSANDRA-10944) +Merged from 2.2: ++ * Fix error streaming section more than 2GB (CASSANDRA-10961) + * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975) + * Enable GC logging by default (CASSANDRA-10140) + * Optimize pending range computation (CASSANDRA-9258) + * Skip commit log and saved cache directories in SSTable version startup check (CASSANDRA-10902) + * drop/alter user should be case sensitive (CASSANDRA-10817) +Merged from 2.1: + * (cqlsh) Add request timeout option to cqlsh (CASSANDRA-10686) + * Avoid AssertionError while submitting hint with LWT (CASSANDRA-10477) + * If CompactionMetadata is not in stats file, use index summary instead (CASSANDRA-10676) + * Retry sending gossip syn multiple times during shadow round (CASSANDRA-8072) + * Fix pending range calculation during moves (CASSANDRA-10887) + * Sane default (200Mbps) for inter-DC streaming througput (CASSANDRA-8708) + + +3.2 + * Make sure tokens don't exist in several data directories (CASSANDRA-6696) + * Add requireAuthorization method to IAuthorizer (CASSANDRA-10852) + * Move static JVM options to conf/jvm.options file (CASSANDRA-10494) + * Fix CassandraVersion to accept x.y version string (CASSANDRA-10931) + * Add forceUserDefinedCleanup to allow more flexible cleanup (CASSANDRA-10708) + * (cqlsh) allow setting TTL with COPY (CASSANDRA-9494) * Fix counting of received sstables in streaming (CASSANDRA-10949) * Implement hints compression (CASSANDRA-9428) * Fix potential assertion error when reading static columns (CASSANDRA-10903) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/StreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java index 61eb13f,268f974..838664d --- a/src/java/org/apache/cassandra/streaming/StreamReader.java +++ b/src/java/org/apache/cassandra/streaming/StreamReader.java @@@ -62,7 -62,10 +62,8 @@@ public class StreamReade protected final SSTableFormat.Type format; protected final int sstableLevel; protected final SerializationHeader.Component header; + protected final int fileSeqNum; - protected Descriptor desc; - public StreamReader(FileMessageHeader header, StreamSession session) { this.session = session; @@@ -106,8 -116,10 +114,10 @@@ { writePartition(deserializer, writer); // TODO move this to BytesReadTracker - session.progress(desc, ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); + session.progress(writer.getFilename(), ProgressInfo.Direction.IN, in.getBytesRead(), totalSize); } + logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", + session.planId(), fileSeqNum, session.peer, in.getBytesRead(), totalSize); return writer; } catch (Throwable e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/StreamWriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java index c123102,5210d5b..5a47787 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java @@@ -24,8 -24,8 +24,9 @@@ import java.nio.channels.ReadableByteCh import com.google.common.base.Throwables; + import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.io.sstable.SSTableMultiWriter; +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -96,9 -104,11 +105,11 @@@ public class CompressedStreamReader ext { writePartition(deserializer, writer); // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred - session.progress(desc, ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); + session.progress(writer.getFilename(), ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize); } } + logger.debug("[Stream #{}] Finished receiving file #{} from {} readBytes = {}, totalSize = {}", session.planId(), fileSeqNum, + session.peer, cis.getTotalCompressedBytesRead(), totalSize); return writer; } catch (Throwable e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java index 93e0f76,f37af29..657da88 --- a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java +++ b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java @@@ -25,8 -25,10 +25,11 @@@ import java.util.List import com.google.common.base.Function; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.apache.cassandra.io.compress.CompressionMetadata; +import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.ChannelProxy; import org.apache.cassandra.io.util.DataOutputStreamPlus; @@@ -76,9 -88,11 +89,11 @@@ public class CompressedStreamWriter ext long lastWrite = out.applyToChannel((wbc) -> fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc)); bytesTransferred += lastWrite; progress += lastWrite; - session.progress(sstable.descriptor, ProgressInfo.Direction.OUT, progress, totalSize); + session.progress(sstable.descriptor.filenameFor(Component.DATA), ProgressInfo.Direction.OUT, progress, totalSize); } } + logger.debug("[Stream #{}] Finished streaming file {} to {}, bytesTransferred = {}, totalSize = {}", + session.planId(), sstable.getFilename(), session.peer, progress, totalSize); } }