Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/73a8341f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/73a8341f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/73a8341f

Branch: refs/heads/cassandra-3.0
Commit: 73a8341fef25de7236bc591e84cddc637c0b7b2f
Parents: 3d211e9 05f8a00
Author: Marcus Eriksson <marc...@apache.org>
Authored: Mon Jun 13 15:14:28 2016 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Mon Jun 13 15:14:28 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/config/DatabaseDescriptor.java    |  4 ++
 .../org/apache/cassandra/db/lifecycle/View.java | 11 +++
 .../cassandra/streaming/StreamSession.java      | 18 +++--
 .../io/sstable/SSTableRewriterTest.java         | 75 +++++++++++++++++---
 5 files changed, 88 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/73a8341f/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 47aef7e,491f72a..8a04077
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,28 -1,5 +1,29 @@@
 -2.2.7
 +3.0.8
 + * Add TimeWindowCompactionStrategy (CASSANDRA-9666)
 +Merged from 2.2:
   * StorageService shutdown hook should use a volatile variable 
(CASSANDRA-11984)
 +Merged from 2.1:
++ * Create interval tree over canonical sstables to avoid missing sstables 
during streaming (CASSANDRA-11886)
 + * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid 
corrupting SSL connections (CASSANDRA-11749)
 +
 +
 +3.0.7
 + * Fix legacy serialization of Thrift-generated non-compound range tombstones
 +   when communicating with 2.x nodes (CASSANDRA-11930)
 + * Fix Directories instantiations where CFS.initialDirectories should be used 
(CASSANDRA-11849)
 + * Avoid referencing DatabaseDescriptor in AbstractType (CASSANDRA-11912)
 + * Fix sstables not being protected from removal during index build 
(CASSANDRA-11905)
 + * cqlsh: Suppress stack trace from Read/WriteFailures (CASSANDRA-11032)
 + * Remove unneeded code to repair index summaries that have
 +   been improperly down-sampled (CASSANDRA-11127)
 + * Avoid WriteTimeoutExceptions during commit log replay due to materialized
 +   view lock contention (CASSANDRA-11891)
 + * Prevent OOM failures on SSTable corruption, improve tests for corruption 
detection (CASSANDRA-9530)
 + * Use CFS.initialDirectories when clearing snapshots (CASSANDRA-11705)
 + * Allow compaction strategies to disable early open (CASSANDRA-11754)
 + * Refactor Materialized View code (CASSANDRA-11475)
 + * Update Java Driver (CASSANDRA-11615)
 +Merged from 2.2:
   * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
   * Run CommitLog tests with different compression settings (CASSANDRA-9039)
   * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73a8341f/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73a8341f/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java
index 17062b4,e303801..99903fc
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@@ -179,52 -131,23 +179,63 @@@ public class Vie
      }
  
      /**
 -      * Returns the sstables that have any partition between {@code left} and 
{@code right}, when both bounds are taken inclusively.
 -      * The interval formed by {@code left} and {@code right} shouldn't wrap.
 -      */
 -    public List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition 
right)
 +     * Returns the sstables that have any partition between {@code left} and 
{@code right}, when both bounds are taken inclusively.
 +     * The interval formed by {@code left} and {@code right} shouldn't wrap.
 +     */
 +    public Iterable<SSTableReader> sstablesInBounds(SSTableSet sstableSet, 
PartitionPosition left, PartitionPosition right)
      {
 -        return sstablesInBounds(left, right, intervalTree);
 +        assert !AbstractBounds.strictlyWrapsAround(left, right);
 +
 +        if (intervalTree.isEmpty())
 +            return Collections.emptyList();
 +
 +        PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() 
: right;
 +        return select(sstableSet, intervalTree.search(Interval.create(left, 
stopInTree)));
      }
  
 -    public static List<SSTableReader> sstablesInBounds(RowPosition left, 
RowPosition right, SSTableIntervalTree intervalTree)
++    public static List<SSTableReader> sstablesInBounds(PartitionPosition 
left, PartitionPosition right, SSTableIntervalTree intervalTree)
+     {
+         assert !AbstractBounds.strictlyWrapsAround(left, right);
+ 
+         if (intervalTree.isEmpty())
+             return Collections.emptyList();
+ 
 -        RowPosition stopInTree = right.isMinimum() ? intervalTree.max() : 
right;
 -        return intervalTree.search(Interval.<RowPosition, 
SSTableReader>create(left, stopInTree));
++        PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() 
: right;
++        return intervalTree.search(Interval.create(left, stopInTree));
++    }
++
 +    public static Function<View, Iterable<SSTableReader>> select(SSTableSet 
sstableSet)
 +    {
 +        return (view) -> view.sstables(sstableSet);
 +    }
 +
 +    public static Function<View, Iterable<SSTableReader>> select(SSTableSet 
sstableSet, Predicate<SSTableReader> filter)
 +    {
 +        return (view) -> view.sstables(sstableSet, filter);
 +    }
 +
 +    /**
 +     * @return a ViewFragment containing the sstables and memtables that may 
need to be merged
 +     * for the given @param key, according to the interval tree
 +     */
 +    public static Function<View, Iterable<SSTableReader>> select(SSTableSet 
sstableSet, DecoratedKey key)
 +    {
 +        assert sstableSet == SSTableSet.LIVE;
 +        return (view) -> view.intervalTree.search(key);
 +    }
 +
 +    /**
 +     * @return a ViewFragment containing the sstables and memtables that may 
need to be merged
 +     * for rows within @param rowBounds, inclusive, according to the interval 
tree.
 +     */
 +    public static Function<View, Iterable<SSTableReader>> select(SSTableSet 
sstableSet, AbstractBounds<PartitionPosition> rowBounds)
 +    {
 +        // Note that View.sstablesInBounds always includes it's bound while 
rowBounds may not. This is ok however
 +        // because the fact we restrict the sstables returned by this 
function is an optimization in the first
 +        // place and the returned sstables will (almost) never cover 
*exactly* rowBounds anyway. It's also
 +        // *very* unlikely that a sstable is included *just* because we 
consider one of the bound inclusively
 +        // instead of exclusively, so the performance impact is negligible in 
practice.
 +        return (view) -> view.sstablesInBounds(sstableSet, rowBounds.left, 
rowBounds.right);
      }
  
      // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73a8341f/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 2ed6ad1,f4c900e..d5c060e
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -25,10 -25,12 +25,13 @@@ import java.util.*
  import java.util.concurrent.*;
  import java.util.concurrent.atomic.AtomicBoolean;
  
+ import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.base.Function;
  import com.google.common.collect.*;
  
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
+ import org.apache.cassandra.db.lifecycle.SSTableIntervalTree;
+ import org.apache.cassandra.db.lifecycle.View;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -320,30 -319,33 +324,24 @@@ public class StreamSession implements I
          {
              for (ColumnFamilyStore cfStore : stores)
              {
 -                final List<Range<RowPosition>> keyRanges = new 
ArrayList<>(ranges.size());
 +                final List<Range<PartitionPosition>> keyRanges = new 
ArrayList<>(ranges.size());
                  for (Range<Token> range : ranges)
                      keyRanges.add(Range.makeRowRange(range));
 -                refs.addAll(cfStore.selectAndReference(new Function<View, 
List<SSTableReader>>()
 -                {
 -                    public List<SSTableReader> apply(View view)
 +                refs.addAll(cfStore.selectAndReference(view -> {
 +                    Set<SSTableReader> sstables = Sets.newHashSet();
++                    SSTableIntervalTree intervalTree = 
SSTableIntervalTree.build(view.sstables(SSTableSet.CANONICAL));
 +                    for (Range<PartitionPosition> keyRange : keyRanges)
                      {
-                         // keyRange excludes its start, while sstableInBounds 
is inclusive (of both start and end).
-                         // This is fine however, because keyRange has been 
created from a token range through Range.makeRowRange (see above).
-                         // And that later method uses the Token.maxKeyBound() 
method to creates the range, which return a "fake" key that
-                         // sort after all keys having the token. That "fake" 
key cannot however be equal to any real key, so that even
-                         // including keyRange.left will still exclude any key 
having the token of the original token range, and so we're
-                         // still actually selecting what we wanted.
-                         for (SSTableReader sstable : 
view.sstablesInBounds(SSTableSet.CANONICAL, keyRange.left, keyRange.right))
 -                        SSTableIntervalTree intervalTree = 
SSTableIntervalTree.build(ColumnFamilyStore.CANONICAL_SSTABLES.apply(view));
 -                        Set<SSTableReader> sstables = Sets.newHashSet();
 -                        for (Range<RowPosition> keyRange : keyRanges)
++                        for (SSTableReader sstable : 
View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree))
                          {
-                             // sstableInBounds may contain early opened 
sstables
 -                            // keyRange excludes its start, while 
sstableInBounds is inclusive (of both start and end).
 -                            // This is fine however, because keyRange has 
been created from a token range through Range.makeRowRange (see above).
 -                            // And that later method uses the 
Token.maxKeyBound() method to creates the range, which return a "fake" key that
 -                            // sort after all keys having the token. That 
"fake" key cannot however be equal to any real key, so that even
 -                            // including keyRange.left will still exclude any 
key having the token of the original token range, and so we're
 -                            // still actually selecting what we wanted.
 -                            for (SSTableReader sstable : 
View.sstablesInBounds(keyRange.left, keyRange.right, intervalTree))
 -                            {
 -                                if (!isIncremental || !sstable.isRepaired())
 -                                    sstables.add(sstable);
 -                            }
 +                            if (!isIncremental || !sstable.isRepaired())
 +                                sstables.add(sstable);
                          }
 -
 -                        logger.debug("ViewFilter for {}/{} sstables", 
sstables.size(), view.sstables.size());
 -                        return ImmutableList.copyOf(sstables);
                      }
 +
 +                    if (logger.isDebugEnabled())
 +                        logger.debug("ViewFilter for {}/{} sstables", 
sstables.size(), Iterables.size(view.sstables(SSTableSet.CANONICAL)));
 +                    return sstables;
                  }).refs);
              }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/73a8341f/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
index 008df06,f50953a..18bc760
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@@ -20,48 -19,49 +20,42 @@@ package org.apache.cassandra.io.sstable
  
  import java.io.File;
  import java.io.IOException;
--import java.nio.ByteBuffer;
  import java.util.*;
- import java.util.concurrent.ThreadLocalRandom;
+ import java.util.concurrent.ExecutionException;
 -import java.util.concurrent.ThreadLocalRandom;
  import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicInteger;
  
  import com.google.common.collect.Iterables;
  import com.google.common.collect.Sets;
--import org.junit.After;
--import org.junit.AfterClass;
--import org.junit.BeforeClass;
  import com.google.common.util.concurrent.Uninterruptibles;
  import org.junit.Test;
  
--import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.Util;
 -import org.apache.cassandra.config.Config;
 +import org.apache.cassandra.UpdateBuilder;
- import org.apache.cassandra.config.Config;
  import org.apache.cassandra.config.DatabaseDescriptor;
 -import org.apache.cassandra.config.KSMetaData;
 -import org.apache.cassandra.db.*;
 -import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.RowUpdateBuilder;
 +import org.apache.cassandra.db.SerializationHeader;
++import org.apache.cassandra.db.lifecycle.View;
 +import org.apache.cassandra.db.rows.EncodingStats;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
  import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
  import org.apache.cassandra.db.compaction.CompactionController;
 -import org.apache.cassandra.db.compaction.CompactionManager;
 -import org.apache.cassandra.db.compaction.LazilyCompactedRow;
 +import org.apache.cassandra.db.compaction.CompactionIterator;
  import org.apache.cassandra.db.compaction.OperationType;
 -import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.io.sstable.format.SSTableReader;
 -import org.apache.cassandra.io.sstable.format.SSTableWriter;
 -import org.apache.cassandra.locator.SimpleStrategy;
  import org.apache.cassandra.db.compaction.SSTableSplitter;
 -import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 +import org.apache.cassandra.db.partitions.ImmutableBTreePartition;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.db.lifecycle.SSTableSet;
- import org.apache.cassandra.db.lifecycle.View;
- import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
- import org.apache.cassandra.io.sstable.format.SSTableWriter;
 +import org.apache.cassandra.io.util.FileUtils;
 +import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
  import org.apache.cassandra.metrics.StorageMetrics;
- import org.apache.cassandra.schema.KeyspaceParams;
 -import org.apache.cassandra.notifications.INotification;
 -import org.apache.cassandra.notifications.INotificationConsumer;
 -import org.apache.cassandra.notifications.SSTableListChangedNotification;
 -import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.streaming.StreamSession;
  import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.Pair;

Reply via email to