Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/87f5e2e3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/87f5e2e3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/87f5e2e3

Branch: refs/heads/cassandra-3.0
Commit: 87f5e2e39c1003c36eba97a92721920f87db3fed
Parents: 0ad0de1 a549bd0
Author: Yuki Morishita <[email protected]>
Authored: Tue Nov 3 10:03:34 2015 -0600
Committer: Yuki Morishita <[email protected]>
Committed: Tue Nov 3 10:03:34 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                             |  1 +
 .../org/apache/cassandra/streaming/StreamReader.java    | 12 ++++++++----
 .../streaming/compress/CompressedStreamReader.java      | 11 ++++++++---
 3 files changed, 17 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 1724f01,5c23acf..e0208c6
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,26 -1,12 +1,27 @@@
 -2.2.4
 - * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581)
 +3.0
 + * Fix implementation of LegacyLayout.LegacyBoundComparator (CASSANDRA-10602)
 + * Don't use 'names query' read path for counters (CASSANDRA-10572)
 + * Fix backward compatibility for counters (CASSANDRA-10470)
 + * Remove memory_allocator paramter from cassandra.yaml (CASSANDRA-10581)
 + * Execute the metadata reload task of all registered indexes on CFS::reload 
(CASSANDRA-10604)
 + * Fix thrift cas operations with defined columns (CASSANDRA-10576)
 + * Fix PartitionUpdate.operationCount()for updates with static column 
operations (CASSANDRA-10606)
 + * Fix thrift get() queries with defined columns (CASSANDRA-10586)
 + * Fix marking of indexes as built and removed (CASSANDRA-10601)
 + * Skip initialization of non-registered 2i instances, remove 
Index::getIndexName (CASSANDRA-10595)
 + * Fix batches on multiple tables (CASSANDRA-10554)
 + * Ensure compaction options are validated when updating KeyspaceMetadata 
(CASSANDRA-10569)
 + * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
 + * Remove token generator (CASSANDRA-5261)
 + * RolesCache should not be created for any authenticator that does not 
requireAuthentication (CASSANDRA-10562)
 + * Fix LogTransaction checking only a single directory for files 
(CASSANDRA-10421)
 + * Fix handling of range tombstones when reading old format sstables 
(CASSANDRA-10360)
 + * Aggregate with Initial Condition fails with C* 3.0 (CASSANDRA-10367)
 +Merged from 2.2:
   * Expose phi values from failure detector via JMX and tweak debug
     and trace logging (CASSANDRA-9526)
 - * Fix RangeNamesQueryPager (CASSANDRA-10509)
 - * Deprecate Pig support (CASSANDRA-10542)
 - * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
  Merged from 2.1:
+  * Fix streaming to catch exception so retry not fail (CASSANDRA-10557)
   * Add validation method to PerRowSecondaryIndex (CASSANDRA-10092)
   * Support encrypted and plain traffic on the same port (CASSANDRA-10559)
   * Do STCS in DTCS windows (CASSANDRA-10276)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/src/java/org/apache/cassandra/streaming/StreamReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamReader.java
index 879491e,1ccebb0..6169494
--- a/src/java/org/apache/cassandra/streaming/StreamReader.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReader.java
@@@ -97,25 -94,34 +97,29 @@@ public class StreamReade
          }
          ColumnFamilyStore cfs = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, 
format);
- 
          DataInputStream dis = new DataInputStream(new 
LZFInputStream(Channels.newInputStream(channel)));
          BytesReadTracker in = new BytesReadTracker(dis);
 -        SSTableWriter writer = null;
 +        StreamDeserializer deserializer = new 
StreamDeserializer(cfs.metadata, in, inputVersion, 
header.toHeader(cfs.metadata));
++        SSTableMultiWriter writer = null;
          try
          {
+             writer = createWriter(cfs, totalSize, repairedAt, format);
              while (in.getBytesRead() < totalSize)
              {
 -                writeRow(writer, in, cfs);
 -
 +                writePartition(deserializer, writer, cfs);
                  // TODO move this to BytesReadTracker
                  session.progress(desc, ProgressInfo.Direction.IN, 
in.getBytesRead(), totalSize);
              }
              return writer;
 -        } catch (Throwable e)
 +        }
 +        catch (Throwable e)
          {
-             SSTableMultiWriter.abortOrDie(writer);
- 
+             if (writer != null)
+             {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can 
drain unread stream
 -                    e.addSuppressed(e2);
 -                }
++                Throwable e2 = writer.abort(null);
++                // add abort error to original and continue so we can drain 
unread stream
++                e.addSuppressed(e2);
+             }
              drain(dis, in.getBytesRead());
              if (e instanceof IOException)
                  throw (IOException) e;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/87f5e2e3/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
index 30cafef,facb906..fca6aa7
--- 
a/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
+++ 
b/src/java/org/apache/cassandra/streaming/compress/CompressedStreamReader.java
@@@ -74,14 -75,12 +74,14 @@@ public class CompressedStreamReader ext
          }
          ColumnFamilyStore cfs = 
Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
  
-         SSTableMultiWriter writer = createWriter(cfs, totalSize, repairedAt, 
format);
- 
 -        CompressedInputStream cis = new 
CompressedInputStream(Channels.newInputStream(channel), compressionInfo);
 +        CompressedInputStream cis = new 
CompressedInputStream(Channels.newInputStream(channel), compressionInfo,
 +                                                              
inputVersion.compressedChecksumType(), cfs::getCrcCheckChance);
          BytesReadTracker in = new BytesReadTracker(new DataInputStream(cis));
 -        SSTableWriter writer = null;
 +        StreamDeserializer deserializer = new 
StreamDeserializer(cfs.metadata, in, inputVersion, 
header.toHeader(cfs.metadata));
++        SSTableMultiWriter writer = null;
          try
          {
+             writer = createWriter(cfs, totalSize, repairedAt, format);
              for (Pair<Long, Long> section : sections)
              {
                  assert cis.getTotalCompressedBytesRead() <= totalSize;
@@@ -102,7 -102,18 +102,12 @@@
          }
          catch (Throwable e)
          {
-             SSTableMultiWriter.abortOrDie(writer);
+             if (writer != null)
+             {
 -                try
 -                {
 -                    writer.abort();
 -                }
 -                catch (Throwable e2)
 -                {
 -                    // add abort error to original and continue so we can 
drain unread stream
 -                    e.addSuppressed(e2);
 -                }
++                Throwable e2 = writer.abort(null);
++                // add abort error to original and continue so we can drain 
unread stream
++                e.addSuppressed(e2);
+             }
              drain(cis, in.getBytesRead());
              if (e instanceof IOException)
                  throw (IOException) e;

Reply via email to