Merge branch 'cassandra-2.2' into cassandra-3.0

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/99e0c907
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/99e0c907
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/99e0c907

Branch: refs/heads/trunk
Commit: 99e0c907eabc26f876f984daf33fdc2d3ab66a24
Parents: 507ed14 3aa7308
Author: Aleksey Yeschenko <alek...@apache.org>
Authored: Fri Aug 14 21:27:08 2015 +0300
Committer: Aleksey Yeschenko <alek...@apache.org>
Committed: Fri Aug 14 21:27:08 2015 +0300

----------------------------------------------------------------------
 .../apache/cassandra/db/ColumnFamilyStore.java  |  2 +-
 .../cassandra/db/SizeEstimatesRecorder.java     |  3 ++-
 .../org/apache/cassandra/db/lifecycle/View.java | 20 ++++++++++++----
 .../apache/cassandra/dht/AbstractBounds.java    | 25 ++++++++++++++++++++
 src/java/org/apache/cassandra/dht/Bounds.java   |  2 +-
 .../apache/cassandra/dht/ExcludingBounds.java   |  2 +-
 .../cassandra/dht/IncludingExcludingBounds.java |  2 +-
 .../cassandra/streaming/StreamSession.java      | 16 +++++++++----
 .../apache/cassandra/db/lifecycle/ViewTest.java |  8 ++++---
 9 files changed, 63 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 797e2c7,8bda6b2..8d72ecf
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1179,7 -1394,7 +1179,7 @@@ public class ColumnFamilyStore implemen
          Set<SSTableReader> results = null;
          for (SSTableReader sstable : sstables)
          {
-             Set<SSTableReader> overlaps = 
ImmutableSet.copyOf(view.sstablesInBounds(sstableSet, 
AbstractBounds.bounds(sstable.first, true, sstable.last, true)));
 -            Set<SSTableReader> overlaps = 
ImmutableSet.copyOf(tree.search(Interval.<RowPosition, 
SSTableReader>create(sstable.first, sstable.last)));
++            Set<SSTableReader> overlaps = 
ImmutableSet.copyOf(view.sstablesInBounds(sstableSet, sstable.first, 
sstable.last));
              results = results == null ? overlaps : Sets.union(results, 
overlaps).immutableCopy();
          }
          results = Sets.difference(results, ImmutableSet.copyOf(sstables));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/db/SizeEstimatesRecorder.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/db/lifecycle/View.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/lifecycle/View.java
index 75590fa,73ba131..7ee0fdf
--- a/src/java/org/apache/cassandra/db/lifecycle/View.java
+++ b/src/java/org/apache/cassandra/db/lifecycle/View.java
@@@ -176,41 -126,19 +176,53 @@@ public class Vie
          return String.format("View(pending_count=%d, sstables=%s, 
compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, 
compacting);
      }
  
-     public Iterable<SSTableReader> sstablesInBounds(SSTableSet sstableSet, 
AbstractBounds<PartitionPosition> rowBounds)
+     /**
 -      * Returns the sstables that have any partition between {@code left} and 
{@code right}, when both bounds are taken inclusively.
 -      * The interval formed by {@code left} and {@code right} shouldn't wrap.
 -      */
 -    public List<SSTableReader> sstablesInBounds(RowPosition left, RowPosition 
right)
++     * Returns the sstables that have any partition between {@code left} and 
{@code right}, when both bounds are taken inclusively.
++     * The interval formed by {@code left} and {@code right} shouldn't wrap.
++     */
++    public Iterable<SSTableReader> sstablesInBounds(SSTableSet sstableSet, 
PartitionPosition left, PartitionPosition right)
      {
+         assert !AbstractBounds.strictlyWrapsAround(left, right);
+ 
          if (intervalTree.isEmpty())
              return Collections.emptyList();
-         PartitionPosition stopInTree = rowBounds.right.isMinimum() ? 
intervalTree.max() : rowBounds.right;
-         return select(sstableSet, 
intervalTree.search(Interval.create(rowBounds.left, stopInTree)));
+ 
 -        RowPosition stopInTree = right.isMinimum() ? intervalTree.max() : 
right;
 -        return intervalTree.search(Interval.<RowPosition, 
SSTableReader>create(left, stopInTree));
++        PartitionPosition stopInTree = right.isMinimum() ? intervalTree.max() 
: right;
++        return select(sstableSet, intervalTree.search(Interval.create(left, 
stopInTree)));
 +    }
 +
 +    public static Function<View, Iterable<SSTableReader>> select(SSTableSet 
sstableSet)
 +    {
 +        return (view) -> view.sstables(sstableSet);
 +    }
 +
 +    public static Function<View, Iterable<SSTableReader>> select(SSTableSet 
sstableSet, Predicate<SSTableReader> filter)
 +    {
 +        return (view) -> view.sstables(sstableSet, filter);
 +    }
 +
 +    /**
 +     * @return a ViewFragment containing the sstables and memtables that may 
need to be merged
 +     * for the given @param key, according to the interval tree
 +     */
 +    public static Function<View, Iterable<SSTableReader>> select(SSTableSet 
sstableSet, DecoratedKey key)
 +    {
 +        assert sstableSet == SSTableSet.LIVE;
 +        return (view) -> view.intervalTree.search(key);
 +    }
 +
 +    /**
 +     * @return a ViewFragment containing the sstables and memtables that may 
need to be merged
 +     * for rows within @param rowBounds, inclusive, according to the interval 
tree.
 +     */
 +    public static Function<View, Iterable<SSTableReader>> select(SSTableSet 
sstableSet, AbstractBounds<PartitionPosition> rowBounds)
 +    {
-         return (view) -> view.sstablesInBounds(sstableSet, rowBounds);
++        // Note that View.sstablesInBounds always includes it's bound while 
rowBounds may not. This is ok however
++        // because the fact we restrict the sstables returned by this 
function is an optimization in the first
++        // place and the returned sstables will (almost) never cover 
*exactly* rowBounds anyway. It's also
++        // *very* unlikely that a sstable is included *just* because we 
consider one of the bound inclusively
++        // instead of exclusively, so the performance impact is negligible in 
practice.
++        return (view) -> view.sstablesInBounds(sstableSet, rowBounds.left, 
rowBounds.right);
      }
  
      // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/AbstractBounds.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/ExcludingBounds.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/dht/IncludingExcludingBounds.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/streaming/StreamSession.java
index 861528b,55d7e68..bb5be1e
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@@ -35,8 -35,7 +35,7 @@@ import org.slf4j.LoggerFactory
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.ColumnFamilyStore;
  import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.db.RowPosition;
 +import org.apache.cassandra.db.PartitionPosition;
- import org.apache.cassandra.dht.AbstractBounds;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
  import org.apache.cassandra.gms.*;
@@@ -320,23 -313,40 +319,30 @@@ public class StreamSession implements I
          {
              for (ColumnFamilyStore cfStore : stores)
              {
-                 final List<AbstractBounds<PartitionPosition>> rowBoundsList = 
new ArrayList<>(ranges.size());
 -                final List<Range<RowPosition>> keyRanges = new 
ArrayList<>(ranges.size());
++                final List<Range<PartitionPosition>> keyRanges = new 
ArrayList<>(ranges.size());
                  for (Range<Token> range : ranges)
-                     rowBoundsList.add(Range.makeRowRange(range));
+                     keyRanges.add(Range.makeRowRange(range));
 -                refs.addAll(cfStore.selectAndReference(new Function<View, 
List<SSTableReader>>()
 -                {
 -                    public List<SSTableReader> apply(View view)
 +                refs.addAll(cfStore.selectAndReference(view -> {
 +                    Set<SSTableReader> sstables = Sets.newHashSet();
-                     for (AbstractBounds<PartitionPosition> rowBounds : 
rowBoundsList)
++                    for (Range<PartitionPosition> keyRange : keyRanges)
                      {
-                         for (SSTableReader sstable : 
view.sstablesInBounds(SSTableSet.CANONICAL, rowBounds))
 -                        Map<SSTableReader, SSTableReader> permittedInstances 
= new HashMap<>();
 -                        for (SSTableReader reader : 
ColumnFamilyStore.CANONICAL_SSTABLES.apply(view))
 -                            permittedInstances.put(reader, reader);
 -
 -                        Set<SSTableReader> sstables = Sets.newHashSet();
 -                        for (Range<RowPosition> keyRange : keyRanges)
++                        // keyRange excludes its start, while sstableInBounds 
is inclusive (of both start and end).
++                        // This is fine however, because keyRange has been 
created from a token range through Range.makeRowRange (see above).
++                        // And that later method uses the Token.maxKeyBound() 
method to creates the range, which return a "fake" key that
++                        // sort after all keys having the token. That "fake" 
key cannot however be equal to any real key, so that even
++                        // including keyRange.left will still exclude any key 
having the token of the original token range, and so we're
++                        // still actually selecting what we wanted.
++                        for (SSTableReader sstable : 
view.sstablesInBounds(SSTableSet.CANONICAL, keyRange.left, keyRange.right))
                          {
 -                            // keyRange excludes its start, while 
sstableInBounds is inclusive (of both start and end).
 -                            // This is fine however, because keyRange has 
been created from a token range through Range.makeRowRange (see above).
 -                            // And that later method uses the 
Token.maxKeyBound() method to creates the range, which return a "fake" key that
 -                            // sort after all keys having the token. That 
"fake" key cannot however be equal to any real key, so that even
 -                            // including keyRange.left will still exclude any 
key having the token of the original token range, and so we're
 -                            // still actually selecting what we wanted.
 -                            for (SSTableReader sstable : 
view.sstablesInBounds(keyRange.left, keyRange.right))
 -                            {
 -                                // sstableInBounds may contain early opened 
sstables
 -                                if (isIncremental && sstable.isRepaired())
 -                                    continue;
 -                                sstable = permittedInstances.get(sstable);
 -                                if (sstable != null)
 -                                    sstables.add(sstable);
 -                            }
++                            // sstableInBounds may contain early opened 
sstables
 +                            if (!isIncremental || !sstable.isRepaired())
 +                                sstables.add(sstable);
                          }
 -
 -                        logger.debug("ViewFilter for {}/{} sstables", 
sstables.size(), view.sstables.size());
 -                        return ImmutableList.copyOf(sstables);
                      }
 +
 +                    if (logger.isDebugEnabled())
 +                        logger.debug("ViewFilter for {}/{} sstables", 
sstables.size(), Iterables.size(view.sstables(SSTableSet.CANONICAL)));
 +                    return sstables;
                  }).refs);
              }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/99e0c907/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
index 27d426a,32a81e2..40afa54
--- a/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/ViewTest.java
@@@ -61,15 -59,16 +61,17 @@@ public class ViewTes
          {
              for (int j = i ; j < 5 ; j++)
              {
 -                RowPosition min = MockSchema.readerBounds(i);
 -                RowPosition max = MockSchema.readerBounds(j);
 +                PartitionPosition min = MockSchema.readerBounds(i);
 +                PartitionPosition max = MockSchema.readerBounds(j);
-                 for (boolean minInc : new boolean[] { true, false} )
+                 for (boolean minInc : new boolean[] { true })//, false} )
                  {
-                     for (boolean maxInc : new boolean[] { true, false} )
+                     for (boolean maxInc : new boolean[] { true })//, false} )
                      {
                          if (i == j && !(minInc && maxInc))
                              continue;
-                         List<SSTableReader> r = 
ImmutableList.copyOf(initialView.sstablesInBounds(SSTableSet.LIVE, 
AbstractBounds.bounds(min, minInc, max, maxInc)));
 -                        AbstractBounds<RowPosition> bounds = 
AbstractBounds.bounds(min, minInc, max, maxInc);
 -                        List<SSTableReader> r = 
initialView.sstablesInBounds(bounds.left, bounds.right);
++
++                        AbstractBounds<PartitionPosition> bounds = 
AbstractBounds.bounds(min, minInc, max, maxInc);
++                        List<SSTableReader> r = 
ImmutableList.copyOf(initialView.sstablesInBounds(SSTableSet.LIVE,bounds.left, 
bounds.right));
                          Assert.assertEquals(String.format("%d(%s) %d(%s)", i, 
minInc, j, maxInc), j - i + (minInc ? 0 : -1) + (maxInc ? 1 : 0), r.size());
                      }
                  }

Reply via email to