Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/cc8f6cc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/cc8f6cc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/cc8f6cc5

Branch: refs/heads/trunk
Commit: cc8f6cc510f3799dde89c9e1e3cbf7515c2113f9
Parents: 52be7ba 0398521
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Aug 3 20:18:08 2016 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Aug 3 20:18:08 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/streaming/StreamSession.java      |  3 +-
 .../cassandra/streaming/StreamTransferTask.java |  4 +-
 .../streaming/messages/FileMessageHeader.java   | 20 +++--
 .../streaming/messages/OutgoingFileMessage.java | 38 ++++++++-
 .../streaming/StreamTransferTaskTest.java       | 85 ++++++++++++++++++--
 6 files changed, 136 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index f0ceb70,87228d3..49733d3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,32 -1,6 +1,33 @@@
 -2.2.8
 +3.0.9
 + * Fixed ConcurrentModificationException when reading metrics in 
GraphiteReporter (CASSANDRA-11823)
 + * Fix upgrade of super columns on thrift (CASSANDRA-12335)
 + * Fixed flacky BlacklistingCompactionsTest, switched to fixed size types and 
increased corruption size (CASSANDRA-12359)
 + * Rerun ReplicationAwareTokenAllocatorTest on failure to avoid flakiness 
(CASSANDRA-12277)
 + * Exception when computing read-repair for range tombstones (CASSANDRA-12263)
 + * Lost counter writes in compact table and static columns (CASSANDRA-12219)
 + * AssertionError with MVs on updating a row that isn't indexed due to a null 
value (CASSANDRA-12247)
 + * Disable RR and speculative retry with EACH_QUORUM reads (CASSANDRA-11980)
 + * Add option to override compaction space check (CASSANDRA-12180)
 + * Faster startup by only scanning each directory for temporary files once 
(CASSANDRA-12114)
 + * Respond with v1/v2 protocol header when responding to driver that attempts
 +   to connect with too low of a protocol version (CASSANDRA-11464)
 + * NullPointerExpception when reading/compacting table (CASSANDRA-11988)
 + * Fix problem with undeleteable rows on upgrade to new sstable format 
(CASSANDRA-12144)
 + * Fix paging logic for deleted partitions with static columns 
(CASSANDRA-12107)
 + * Wait until the message is being send to decide which serializer must be 
used (CASSANDRA-11393)
 + * Fix migration of static thrift column names with non-text comparators 
(CASSANDRA-12147)
 + * Fix upgrading sparse tables that are incorrectly marked as dense 
(CASSANDRA-11315)
 + * Fix reverse queries ignoring range tombstones (CASSANDRA-11733)
 + * Avoid potential race when rebuilding CFMetaData (CASSANDRA-12098)
 + * Avoid missing sstables when getting the canonical sstables 
(CASSANDRA-11996)
 + * Always select the live sstables when getting sstables in bounds 
(CASSANDRA-11944)
 + * Fix column ordering of results with static columns for Thrift requests in
 +   a mixed 2.x/3.x cluster, also fix potential non-resolved duplication of
 +   those static columns in query results (CASSANDRA-12123)
 + * Avoid digest mismatch with empty but static rows (CASSANDRA-12090)
 + * Fix EOF exception when altering column type (CASSANDRA-11820)
 +Merged from 2.2:
+  * Release sstables of failed stream sessions only when outgoing transfers 
are finished (CASSANDRA-11345)
 - * Revert CASSANDRA-11427 (CASSANDRA-12351)
   * Wait for tracing events before returning response and query at same 
consistency level client side (CASSANDRA-11465)
   * cqlsh copyutil should get host metadata by connected address 
(CASSANDRA-11979)
   * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
index 2b5047d,b2af699..0e06bc0
--- a/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
+++ b/src/java/org/apache/cassandra/streaming/messages/FileMessageHeader.java
@@@ -61,11 -58,13 +61,14 @@@ public class FileMessageHeade
      private final CompressionMetadata compressionMetadata;
      public final long repairedAt;
      public final int sstableLevel;
 +    public final SerializationHeader.Component header;
  
+     /* cached size value */
+     private transient final long size;
+ 
      public FileMessageHeader(UUID cfId,
                               int sequenceNumber,
 -                             String version,
 +                             Version version,
                               SSTableFormat.Type format,
                               long estimatedKeys,
                               List<Pair<Long, Long>> sections,
@@@ -84,7 -82,7 +87,8 @@@
          this.compressionMetadata = null;
          this.repairedAt = repairedAt;
          this.sstableLevel = sstableLevel;
 +        this.header = header;
+         this.size = calculateSize();
      }
  
      public FileMessageHeader(UUID cfId,
@@@ -108,7 -105,7 +112,8 @@@
          this.compressionMetadata = compressionMetadata;
          this.repairedAt = repairedAt;
          this.sstableLevel = sstableLevel;
 +        this.header = header;
+         this.size = calculateSize();
      }
  
      public boolean isCompressed()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/src/java/org/apache/cassandra/streaming/messages/OutgoingFileMessage.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/cc8f6cc5/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
index 9572552,02af9a7..dce56eb
--- a/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
+++ b/test/unit/org/apache/cassandra/streaming/StreamTransferTaskTest.java
@@@ -24,7 -28,9 +28,8 @@@ import java.util.concurrent.Cancellatio
  import java.util.concurrent.Future;
  import java.util.concurrent.TimeUnit;
  
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.junit.BeforeClass;
+ import org.junit.After;
  import org.junit.Test;
  
  import junit.framework.Assert;
@@@ -34,9 -41,10 +39,11 @@@ import org.apache.cassandra.db.Keyspace
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.locator.SimpleStrategy;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
  import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.concurrent.Ref;
  
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertNull;
@@@ -103,4 -115,68 +114,68 @@@ public class StreamTransferTaskTes
          // when all streaming are done, time out task should not be scheduled.
          assertNull(task.scheduleTimeout(1, 1, TimeUnit.SECONDS));
      }
+ 
+     @Test
+     public void testFailSessionDuringTransferShouldNotReleaseReferences() 
throws Exception
+     {
+         InetAddress peer = FBUtilities.getBroadcastAddress();
+         StreamCoordinator streamCoordinator = new StreamCoordinator(1, true, 
false, null);
+         StreamResultFuture future = 
StreamResultFuture.init(UUID.randomUUID(), "", 
Collections.<StreamEventHandler>emptyList(), streamCoordinator);
+         StreamSession session = new StreamSession(peer, peer, null, 0, true, 
false);
+         session.init(future);
+         ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+ 
+         // create two sstables
+         for (int i = 0; i < 2; i++)
+         {
+             SchemaLoader.insertData(KEYSPACE1, CF_STANDARD, i, 1);
+             cfs.forceBlockingFlush();
+         }
+ 
+         // create streaming task that streams those two sstables
+         StreamTransferTask task = new StreamTransferTask(session, 
cfs.metadata.cfId);
+         List<Ref<SSTableReader>> refs = new 
ArrayList<>(cfs.getSSTables().size());
 -        for (SSTableReader sstable : cfs.getSSTables())
++        for (SSTableReader sstable : cfs.getLiveSSTables())
+         {
+             List<Range<Token>> ranges = new ArrayList<>();
+             ranges.add(new Range<>(sstable.first.getToken(), 
sstable.last.getToken()));
+             Ref<SSTableReader> ref = sstable.selfRef();
+             refs.add(ref);
+             task.addTransferFile(ref, 1, 
sstable.getPositionsForRanges(ranges), 0);
+         }
+         assertEquals(2, task.getTotalNumberOfFiles());
+ 
+         //add task to stream session, so it is aborted when stream session 
fails
+         session.transfers.put(UUID.randomUUID(), task);
+ 
+         //make a copy of outgoing file messages, since task is cleared when 
it's aborted
+         Collection<OutgoingFileMessage> files = new 
LinkedList<>(task.files.values());
+ 
+         //simulate start transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.startTransfer();
+         }
+ 
+         //fail stream session mid-transfer
+         session.onError(new Exception("Fake exception"));
+ 
+         //make sure reference was not released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(1, ref.globalCount());
+         }
+ 
+         //simulate finish transfer
+         for (OutgoingFileMessage file : files)
+         {
+             file.finishTransfer();
+         }
+ 
+         //now reference should be released
+         for (Ref<SSTableReader> ref : refs)
+         {
+             assertEquals(0, ref.globalCount());
+         }
+     }
  }

Reply via email to