Merge branch 'cassandra-3.0' into cassandra-3.3

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7feb80d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7feb80d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7feb80d

Branch: refs/heads/trunk
Commit: a7feb80d6bd90a8f8041741e561282e6888a78f2
Parents: cf3dcc2 0f995a2
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Jan 13 13:16:24 2016 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Jan 13 13:16:24 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/ConnectionHandler.java  |  2 +-
 .../cassandra/streaming/StreamReader.java       | 19 ++++++--
 .../cassandra/streaming/StreamWriter.java       | 10 ++++-
 .../compress/CompressedInputStream.java         | 41 ++++++-----------
 .../compress/CompressedStreamReader.java        | 26 +++++++----
 .../compress/CompressedStreamWriter.java        | 15 +++++++
 .../compression/CompressedInputStreamTest.java  | 46 --------------------
 8 files changed, 73 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index fd05c01,614d5b4..aae5efe
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -7,28 -4,6 +7,29 @@@ Merged from 3.0
     tombstone (CASSANDRA-10743)
   * MV should use the maximum timestamp of the primary key (CASSANDRA-10910)
   * Fix potential assertion error during compaction (CASSANDRA-10944)
 +Merged from 2.2:
++ * Fix error streaming section more than 2GB (CASSANDRA-10961)
 + * Histogram buckets exposed in jmx are sorted incorrectly (CASSANDRA-10975)
 + * Enable GC logging by default (CASSANDRA-10140)
 + * Optimize pending range computation (CASSANDRA-9258)
 + * Skip commit log and saved cache directories in SSTable version startup 
check (CASSANDRA-10902)
 + * drop/alter user should be case sensitive (CASSANDRA-10817)
 +Merged from 2.1:
 + * (cqlsh) Add request timeout option to cqlsh (CASSANDRA-10686)
 + * Avoid AssertionError while submitting hint with LWT (CASSANDRA-10477)
 + * If CompactionMetadata is not in stats file, use index summary instead 
(CASSANDRA-10676)
 + * Retry sending gossip syn multiple times during shadow round 
(CASSANDRA-8072)
 + * Fix pending range calculation during moves (CASSANDRA-10887)
 + * Sane default (200Mbps) for inter-DC streaming througput (CASSANDRA-8708)
 +
 +
 +3.2
 + * Make sure tokens don't exist in several data directories (CASSANDRA-6696)
 + * Add requireAuthorization method to IAuthorizer (CASSANDRA-10852)
 + * Move static JVM options to conf/jvm.options file (CASSANDRA-10494)
 + * Fix CassandraVersion to accept x.y version string (CASSANDRA-10931)
 + * Add forceUserDefinedCleanup to allow more flexible cleanup 
(CASSANDRA-10708)
 + * (cqlsh) allow setting TTL with COPY (CASSANDRA-9494)
   * Fix counting of received sstables in streaming (CASSANDRA-10949)
   * Implement hints compression (CASSANDRA-9428)
   * Fix potential assertion error when reading static columns (CASSANDRA-10903)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 61eb13f,268f974..838664d
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -62,7 -62,10 +62,8 @@@ public class StreamReade
      protected final SSTableFormat.Type format;
      protected final int sstableLevel;
      protected final SerializationHeader.Component header;
+     protected final int fileSeqNum;
  
 -    protected Descriptor desc;
 -
      public StreamReader(FileMessageHeader header, StreamSession session)
      {
          this.session = session;
@@@ -106,8 -116,10 +114,10 @@@
              {
                  writePartition(deserializer, writer);
                  // TODO move this to BytesReadTracker
 -                session.progress(desc, ProgressInfo.Direction.IN, 
in.getBytesRead(), totalSize);
 +                session.progress(writer.getFilename(), 
ProgressInfo.Direction.IN, in.getBytesRead(), totalSize);
              }
+             logger.debug("[Stream #{}] Finished receiving file #{} from {} 
readBytes = {}, totalSize = {}",
+                          session.planId(), fileSeqNum, session.peer, 
in.getBytesRead(), totalSize);
              return writer;
          }
          catch (Throwable e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/StreamWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index c123102,5210d5b..5a47787
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -24,8 -24,8 +24,9 @@@ import java.nio.channels.ReadableByteCh
  
  import com.google.common.base.Throwables;
  
+ import org.apache.cassandra.db.DecoratedKey;
  import org.apache.cassandra.io.sstable.SSTableMultiWriter;
 +import org.apache.cassandra.io.sstable.format.RangeAwareSSTableWriter;
  
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -96,9 -104,11 +105,11 @@@ public class CompressedStreamReader ext
                  {
                      writePartition(deserializer, writer);
                      // when compressed, report total bytes of compressed 
chunks read since remoteFile.size is the sum of chunks transferred
 -                    session.progress(desc, ProgressInfo.Direction.IN, 
cis.getTotalCompressedBytesRead(), totalSize);
 +                    session.progress(writer.getFilename(), 
ProgressInfo.Direction.IN, cis.getTotalCompressedBytesRead(), totalSize);
                  }
              }
+             logger.debug("[Stream #{}] Finished receiving file #{} from {} 
readBytes = {}, totalSize = {}", session.planId(), fileSeqNum,
+                          session.peer, cis.getTotalCompressedBytesRead(), 
totalSize);
              return writer;
          }
          catch (Throwable e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7feb80d/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
index 93e0f76,f37af29..657da88
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamWriter.java
@@@ -25,8 -25,10 +25,11 @@@ import java.util.List
  
  import com.google.common.base.Function;
  
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
  import org.apache.cassandra.io.compress.CompressionMetadata;
 +import org.apache.cassandra.io.sstable.Component;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.util.ChannelProxy;
  import org.apache.cassandra.io.util.DataOutputStreamPlus;
@@@ -76,9 -88,11 +89,11 @@@ public class CompressedStreamWriter ext
                      long lastWrite = out.applyToChannel((wbc) -> 
fc.transferTo(section.left + bytesTransferredFinal, toTransfer, wbc));
                      bytesTransferred += lastWrite;
                      progress += lastWrite;
 -                    session.progress(sstable.descriptor, 
ProgressInfo.Direction.OUT, progress, totalSize);
 +                    
session.progress(sstable.descriptor.filenameFor(Component.DATA), 
ProgressInfo.Direction.OUT, progress, totalSize);
                  }
              }
+             logger.debug("[Stream #{}] Finished streaming file {} to {}, 
bytesTransferred = {}, totalSize = {}",
+                          session.planId(), sstable.getFilename(), 
session.peer, progress, totalSize);
          }
      }
  

Reply via email to