Merge branch 'cassandra-3.0' into cassandra-3.9

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8458e4e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8458e4e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8458e4e3

Branch: refs/heads/cassandra-3.8
Commit: 8458e4e30744e8df6fcbe12aec286db047411385
Parents: ee60941 62ef861
Author: Yuki Morishita <yu...@apache.org>
Authored: Tue Aug 9 16:56:29 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Aug 9 16:56:29 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/cassandra/config/Config.java     |  4 ++
 .../cassandra/config/DatabaseDescriptor.java    |  5 --
 .../cassandra/streaming/StreamReader.java       | 26 +---------
 .../cassandra/streaming/StreamSession.java      | 36 +-------------
 .../compress/CompressedInputStream.java         | 21 +++++++-
 .../compress/CompressedStreamReader.java        | 10 ++--
 .../streaming/messages/IncomingFileMessage.java | 22 ++-------
 .../streaming/messages/RetryMessage.java        |  4 ++
 .../org/apache/cassandra/utils/Throwables.java  | 14 ++++++
 .../compression/CompressedInputStreamTest.java  | 52 +++++++++++++++++---
 11 files changed, 100 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index b7bbf72,f613c5f..8fbd774
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -22,47 -15,6 +22,48 @@@ Merged from 3.0
     to connect with too low of a protocol version (CASSANDRA-11464)
   * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
   * Fix problem with undeleteable rows on upgrade to new sstable format 
(CASSANDRA-12144)
 +Merged from 2.2:
++ * Fix hanging stream session (CASSANDRA-10992)
 + * Fix INSERT JSON, fromJson() support of smallint, tinyint types 
(CASSANDRA-12371)
 + * Release sstables of failed stream sessions only when outgoing transfers 
are finished (CASSANDRA-11345)
 + * Wait for tracing events before returning response and query at same 
consistency level client side (CASSANDRA-11465)
 + * cqlsh copyutil should get host metadata by connected address 
(CASSANDRA-11979)
 + * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
 +Merged from 2.1:
 + * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850)
 + * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040)
 +
 +
 +3.8
 + * RTE from new CDC column breaks in flight queries (CASSANDRA-12236)
 + * Fix hdr logging for single operation workloads (CASSANDRA-12145)
 + * Fix SASI PREFIX search in CONTAINS mode with partial terms 
(CASSANDRA-12073)
 + * Increase size of flushExecutor thread pool (CASSANDRA-12071)
 + * Partial revert of CASSANDRA-11971, cannot recycle buffer in 
SP.sendMessagesToNonlocalDC (CASSANDRA-11950)
 + * Upgrade netty to 4.0.39 (CASSANDRA-12032, CASSANDRA-12034)
 + * Improve details in compaction log message (CASSANDRA-12080)
 + * Allow unset values in CQLSSTableWriter (CASSANDRA-11911)
 + * Chunk cache to request compressor-compatible buffers if pool space is 
exhausted (CASSANDRA-11993)
 + * Remove DatabaseDescriptor dependencies from SequentialWriter 
(CASSANDRA-11579)
 + * Move skip_stop_words filter before stemming (CASSANDRA-12078)
 + * Support seek() in EncryptedFileSegmentInputStream (CASSANDRA-11957)
 + * SSTable tools mishandling LocalPartitioner (CASSANDRA-12002)
 + * When SEPWorker assigned work, set thread name to match pool 
(CASSANDRA-11966)
 + * Add cross-DC latency metrics (CASSANDRA-11596)
 + * Allow terms in selection clause (CASSANDRA-10783)
 + * Add bind variables to trace (CASSANDRA-11719)
 + * Switch counter shards' clock to timestamps (CASSANDRA-9811)
 + * Introduce HdrHistogram and response/service/wait separation to stress tool 
(CASSANDRA-11853)
 + * entry-weighers in QueryProcessor should respect partitionKeyBindIndexes 
field (CASSANDRA-11718)
 + * Support older ant versions (CASSANDRA-11807)
 + * Estimate compressed on disk size when deciding if sstable size limit 
reached (CASSANDRA-11623)
 + * cassandra-stress profiles should support case sensitive schemas 
(CASSANDRA-11546)
 + * Remove DatabaseDescriptor dependency from FileUtils (CASSANDRA-11578)
 + * Faster streaming (CASSANDRA-9766)
 + * Add prepared query parameter to trace for "Execute CQL3 prepared query" 
session (CASSANDRA-11425)
 + * Add repaired percentage metric (CASSANDRA-11503)
 + * Add Change-Data-Capture (CASSANDRA-8844)
 +Merged from 3.0:
   * Fix paging logic for deleted partitions with static columns 
(CASSANDRA-12107)
   * Wait until the message is being send to decide which serializer must be 
used (CASSANDRA-11393)
   * Fix migration of static thrift column names with non-text comparators 
(CASSANDRA-12147)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 0cd6329,4ca7937..816f028
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -154,32 -153,12 +151,13 @@@ public class StreamReade
      {
          Directories.DataDirectory localDir = 
cfs.getDirectories().getWriteableLocation(totalSize);
          if (localDir == null)
 -            throw new IOException("Insufficient disk space to store " + 
totalSize + " bytes");
 -        desc = 
Descriptor.fromFilename(cfs.getSSTablePath(cfs.getDirectories().getLocationForDisk(localDir),
 format));
 +            throw new IOException(String.format("Insufficient disk space to 
store %s", FBUtilities.prettyPrintMemory(totalSize)));
  
 -        return cfs.createSSTableMultiWriter(desc, estimatedKeys, repairedAt, 
sstableLevel, getHeader(cfs.metadata), session.getTransaction(cfId));
 +        RangeAwareSSTableWriter writer = new RangeAwareSSTableWriter(cfs, 
estimatedKeys, repairedAt, format, sstableLevel, totalSize, 
session.getTransaction(cfId), getHeader(cfs.metadata));
 +        StreamHook.instance.reportIncomingFile(cfs, writer, session, 
fileSeqNum);
 +        return writer;
      }
  
-     protected void drain(InputStream dis, long bytesRead) throws IOException
-     {
-         long toSkip = totalSize() - bytesRead;
- 
-         // InputStream.skip can return -1 if dis is inaccessible.
-         long skipped = dis.skip(toSkip);
-         if (skipped == -1)
-             return;
- 
-         toSkip = toSkip - skipped;
-         while (toSkip > 0)
-         {
-             skipped = dis.skip(toSkip);
-             if (skipped == -1)
-                 break;
-             toSkip = toSkip - skipped;
-         }
-     }
- 
      protected long totalSize()
      {
          long size = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
index 3aaa1a3,d59849f..81abefa
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedInputStream.java
@@@ -79,31 -90,32 +88,37 @@@ public class CompressedInputStream exte
          // buffer is limited to store up to 1024 chunks
          this.dataBuffer = new 
ArrayBlockingQueue<>(Math.min(info.chunks.length, 1024));
          this.crcCheckChanceSupplier = crcCheckChanceSupplier;
 +        this.checksumType = checksumType;
  
 -        new Thread(new Reader(source, info, dataBuffer)).start();
 +        new FastThreadLocalThread(new Reader(source, info, 
dataBuffer)).start();
      }
  
 -    public int read() throws IOException
 +    private void decompressNextChunk() throws IOException
      {
+         if (readException != null)
+             throw readException;
+ 
 -        if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
 +        try
          {
 -            try
 +            byte[] compressedWithCRC = dataBuffer.take();
 +            if (compressedWithCRC == POISON_PILL)
-                 throw new EOFException("No chunk available");
+             {
 -                byte[] compressedWithCRC = dataBuffer.take();
 -                if (compressedWithCRC == POISON_PILL)
 -                {
 -                    assert readException != null;
 -                    throw readException;
 -                }
 -                decompress(compressedWithCRC);
 -            }
 -            catch (InterruptedException e)
 -            {
 -                throw new EOFException("No chunk available");
++                assert readException != null;
++                throw readException;
+             }
 +            decompress(compressedWithCRC);
 +        }
 +        catch (InterruptedException e)
 +        {
 +            throw new EOFException("No chunk available");
          }
 +    }
 +
 +    @Override
 +    public int read() throws IOException
 +    {
 +        if (current >= bufferOffset + buffer.length || validBufferBytes == -1)
 +            decompressNextChunk();
  
          assert current >= bufferOffset && current < bufferOffset + 
validBufferBytes;
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 8dafa9c,bc87c8f..70b5765
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -35,9 -36,11 +35,11 @@@ import org.apache.cassandra.streaming.P
  import org.apache.cassandra.streaming.StreamReader;
  import org.apache.cassandra.streaming.StreamSession;
  import org.apache.cassandra.streaming.messages.FileMessageHeader;
 -import org.apache.cassandra.io.util.TrackedInputStream;
 +import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;
  
+ import static org.apache.cassandra.utils.Throwables.extractIOExceptionCause;
+ 
  /**
   * StreamReader that reads from streamed compressed SSTable
   */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/src/java/org/apache/cassandra/streaming/messages/IncomingFileMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8458e4e3/test/unit/org/apache/cassandra/streaming/compression/CompressedInputStreamTest.java
----------------------------------------------------------------------

Reply via email to