Merge branch 'cassandra-3.0' into trunk

Conflicts:
        src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
        src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
        src/java/org/apache/cassandra/service/pager/QueryPager.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/928e4c28
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/928e4c28
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/928e4c28

Branch: refs/heads/trunk
Commit: 928e4c28c63e0142979d097ec57110dbc1046d67
Parents: 73c4826 6094974
Author: Benedict Elliott Smith <[email protected]>
Authored: Mon Oct 26 21:08:07 2015 +0000
Committer: Benedict Elliott Smith <[email protected]>
Committed: Mon Oct 26 21:08:07 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/EmptyIterators.java | 214 ++++++++++++
 .../cassandra/db/PartitionRangeReadCommand.java |  10 +-
 .../org/apache/cassandra/db/ReadCommand.java    | 170 +++++-----
 src/java/org/apache/cassandra/db/ReadQuery.java |   4 +-
 .../db/SinglePartitionNamesCommand.java         |   2 +-
 .../db/SinglePartitionSliceCommand.java         |   2 +-
 .../db/compaction/CompactionIterator.java       |  17 +-
 .../db/filter/ClusteringIndexNamesFilter.java   |  10 +-
 .../db/filter/ClusteringIndexSliceFilter.java   |  19 +-
 .../apache/cassandra/db/filter/DataLimits.java  | 185 ++++++++---
 .../apache/cassandra/db/filter/RowFilter.java   |  45 +--
 .../AlteringUnfilteredPartitionIterator.java    |  72 ----
 .../db/partitions/BasePartitionIterator.java    |  27 ++
 .../partitions/CountingPartitionIterator.java   |  58 ----
 .../db/partitions/CountingRowIterator.java      |  58 ----
 .../CountingUnfilteredPartitionIterator.java    |  52 ---
 .../CountingUnfilteredRowIterator.java          |  64 ----
 .../db/partitions/PartitionIterator.java        |   3 +-
 .../db/partitions/PartitionIterators.java       | 102 ++----
 .../cassandra/db/partitions/PurgeFunction.java  | 120 +++++++
 .../db/partitions/PurgingPartitionIterator.java | 156 ---------
 .../partitions/UnfilteredPartitionIterator.java |   6 +-
 .../UnfilteredPartitionIterators.java           | 125 ++-----
 .../partitions/WrappingPartitionIterator.java   |  50 ---
 .../WrappingUnfilteredPartitionIterator.java    | 126 -------
 .../db/rows/AlteringUnfilteredRowIterator.java  |  98 ------
 .../cassandra/db/rows/BaseRowIterator.java      |  64 ++++
 .../apache/cassandra/db/rows/RowIterator.java   |  32 +-
 .../apache/cassandra/db/rows/RowIterators.java  |  68 +---
 .../db/rows/UnfilteredRowIterator.java          |  32 +-
 .../rows/UnfilteredRowIteratorSerializer.java   |   2 +-
 .../db/rows/UnfilteredRowIterators.java         | 215 +++---------
 .../cassandra/db/rows/WrappingRowIterator.java  |  79 -----
 .../db/rows/WrappingUnfilteredRowIterator.java  |   2 +-
 .../cassandra/db/transform/BaseIterator.java    | 129 ++++++++
 .../cassandra/db/transform/BasePartitions.java  | 100 ++++++
 .../apache/cassandra/db/transform/BaseRows.java | 139 ++++++++
 .../apache/cassandra/db/transform/Filter.java   |  56 ++++
 .../db/transform/FilteredPartitions.java        |  40 +++
 .../cassandra/db/transform/FilteredRows.java    |  40 +++
 .../cassandra/db/transform/MoreContents.java    |   8 +
 .../cassandra/db/transform/MorePartitions.java  |  35 ++
 .../apache/cassandra/db/transform/MoreRows.java |  36 ++
 .../apache/cassandra/db/transform/Stack.java    |  81 +++++
 .../db/transform/StoppingTransformation.java    |  60 ++++
 .../cassandra/db/transform/Transformation.java  | 145 +++++++++
 .../db/transform/UnfilteredPartitions.java      |  27 ++
 .../cassandra/db/transform/UnfilteredRows.java  |  40 +++
 .../cassandra/index/SecondaryIndexBuilder.java  |   9 +-
 .../internal/composites/CompositesSearcher.java |  24 +-
 .../io/sstable/ReducingKeyIterator.java         |   5 +-
 .../io/sstable/format/big/BigTableWriter.java   |  29 +-
 .../apache/cassandra/service/DataResolver.java  | 106 +++---
 .../apache/cassandra/service/StorageProxy.java  |  10 +-
 .../service/pager/AbstractQueryPager.java       | 111 +++----
 .../service/pager/MultiPartitionPager.java      |  14 +-
 .../cassandra/service/pager/QueryPager.java     |   6 +-
 .../cassandra/service/pager/QueryPagers.java    |   7 +-
 .../cassandra/thrift/ThriftResultsMerger.java   |  26 +-
 .../cassandra/utils/CloseableIterator.java      |   3 +-
 .../org/apache/cassandra/utils/Throwables.java  |   9 +-
 .../Keyspace1-Standard1-jb-0-Summary.db         | Bin 202 -> 162 bytes
 test/unit/org/apache/cassandra/Util.java        |   4 +-
 .../apache/cassandra/db/TransformerTest.java    | 325 +++++++++++++++++++
 .../apache/cassandra/repair/ValidatorTest.java  |   7 +-
 .../cassandra/service/DataResolverTest.java     |   6 +-
 67 files changed, 2226 insertions(+), 1701 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 02f6584,12f62f7..c28d3bd
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,5 +1,11 @@@
 +3.2
 + * Added graphing option to cassandra-stress (CASSANDRA-7918)
 + * Abort in-progress queries that time out (CASSANDRA-7392)
 + * Add transparent data encryption core classes (CASSANDRA-9945)
 +
 +
  3.0
+  * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
   * Remove token generator (CASSANDRA-5261)
   * RolesCache should not be created for any authenticator that does not 
requireAuthentication (CASSANDRA-10562)
   * Fix LogTransaction checking only a single directory for files 
(CASSANDRA-10421)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 1ed8bb4,4d9b65b..f50a8cf
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -28,9 -28,9 +28,11 @@@ import org.slf4j.LoggerFactory
  import org.apache.cassandra.config.*;
  import org.apache.cassandra.cql3.Operator;
  import org.apache.cassandra.db.filter.*;
 +import org.apache.cassandra.db.monitoring.MonitorableImpl;
  import org.apache.cassandra.db.partitions.*;
  import org.apache.cassandra.db.rows.*;
++import org.apache.cassandra.db.transform.StoppingTransformation;
+ import org.apache.cassandra.db.transform.Transformation;
  import org.apache.cassandra.dht.AbstractBounds;
  import org.apache.cassandra.index.Index;
  import org.apache.cassandra.io.IVersionedSerializer;
@@@ -446,75 -410,60 +414,100 @@@ public abstract class ReadCommand exten
              }
  
              @Override
-             public void close()
+             public Row applyToRow(Row row)
              {
-                 try
+                 if (row.hasLiveData(ReadCommand.this.nowInSec()))
+                     ++liveRows;
+ 
+                 for (Cell cell : row.cells())
                  {
-                     super.close();
+                     if (!cell.isLive(ReadCommand.this.nowInSec()))
+                         countTombstone(row.clustering());
                  }
-                 finally
-                 {
-                     recordLatency(metric, System.nanoTime() - startTimeNanos);
- 
-                     metric.tombstoneScannedHistogram.update(tombstones);
-                     metric.liveScannedHistogram.update(liveRows);
+                 return row;
+             }
  
-                     boolean warnTombstones = tombstones > warningThreshold && 
respectTombstoneThresholds;
-                     if (warnTombstones)
-                     {
-                         String msg = String.format("Read %d live rows and %d 
tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, 
tombstones, ReadCommand.this.toCQLString());
-                         ClientWarn.warn(msg);
-                         logger.warn(msg);
-                     }
+             @Override
+             public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker 
marker)
+             {
+                 countTombstone(marker.clustering());
+                 return marker;
+             }
  
-                     Tracing.trace("Read {} live and {} tombstone cells{}", 
liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : 
""));
+             private void countTombstone(ClusteringPrefix clustering)
+             {
+                 ++tombstones;
+                 if (tombstones > failureThreshold && 
respectTombstoneThresholds)
+                 {
+                     String query = ReadCommand.this.toCQLString();
+                     Tracing.trace("Scanned over {} tombstones for query {}; 
query aborted (see tombstone_failure_threshold)", failureThreshold, query);
+                     throw new TombstoneOverwhelmingException(tombstones, 
query, ReadCommand.this.metadata(), currentKey, clustering);
                  }
              }
-         };
-     }
  
-     protected UnfilteredPartitionIterator 
withStateTracking(UnfilteredPartitionIterator iter)
-     {
-         return new WrappingUnfilteredPartitionIterator(iter)
-         {
              @Override
-             public UnfilteredRowIterator computeNext(UnfilteredRowIterator 
iter)
+             public void onClose()
              {
-                 if (isAborted())
-                     return null;
+                 recordLatency(metric, System.nanoTime() - startTimeNanos);
  
-                 if (TEST_ITERATION_DELAY_MILLIS > 0)
-                     maybeDelayForTesting();
+                 metric.tombstoneScannedHistogram.update(tombstones);
+                 metric.liveScannedHistogram.update(liveRows);
  
-                 return iter;
+                 boolean warnTombstones = tombstones > warningThreshold && 
respectTombstoneThresholds;
+                 if (warnTombstones)
+                 {
+                     String msg = String.format("Read %d live rows and %d 
tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows, 
tombstones, ReadCommand.this.toCQLString());
+                     ClientWarn.warn(msg);
+                     logger.warn(msg);
+                 }
+ 
+                 Tracing.trace("Read {} live and {} tombstone cells{}", 
liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" : 
""));
              }
          };
+ 
+         return Transformation.apply(iter, new MetricRecording());
      }
  
-     protected UnfilteredRowIterator withStateTracking(UnfilteredRowIterator 
iter)
++    protected class CheckForAbort extends 
StoppingTransformation<BaseRowIterator<?>>
 +    {
-         return new WrappingUnfilteredRowIterator(iter)
++        protected BaseRowIterator<?> applyToPartition(BaseRowIterator 
partition)
 +        {
-             @Override
-             public boolean hasNext()
-             {
-                 if (isAborted())
-                     return false;
++            maybeAbort();
++            return partition;
++        }
 +
-                 if (TEST_ITERATION_DELAY_MILLIS > 0)
-                     maybeDelayForTesting();
++        protected Row applyToRow(Row row)
++        {
++            maybeAbort();
++            return row;
++        }
 +
-                 return super.hasNext();
-             }
-         };
++        private void maybeAbort()
++        {
++            if (isAborted())
++                stop();
++
++            if (TEST_ITERATION_DELAY_MILLIS > 0)
++                maybeDelayForTesting();
++        }
++    }
++
++    protected UnfilteredPartitionIterator 
withStateTracking(UnfilteredPartitionIterator iter)
++    {
++        return Transformation.apply(iter, new CheckForAbort());
++    }
++
++    protected UnfilteredRowIterator withStateTracking(UnfilteredRowIterator 
iter)
++    {
++        return Transformation.apply(iter, new CheckForAbort());
 +    }
 +
 +    private void maybeDelayForTesting()
 +    {
 +        if (!metadata.ksName.startsWith("system"))
 +            FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
 +    }
 +
      /**
       * Creates a message for this command.
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadQuery.java
index 2b5c09c,178ca7c..ba7b893
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@@ -42,12 -42,12 +42,12 @@@ public interface ReadQuer
  
          public PartitionIterator execute(ConsistencyLevel consistency, 
ClientState clientState) throws RequestExecutionException
          {
-             return PartitionIterators.EMPTY;
+             return EmptyIterators.partition();
          }
  
 -        public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
 +        public PartitionIterator executeInternal(ReadExecutionController 
controller)
          {
-             return PartitionIterators.EMPTY;
+             return EmptyIterators.partition();
          }
  
          public DataLimits limits()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index de4e54b,2599b8d..386d7ae
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@@ -61,33 -60,33 +60,33 @@@ abstract class AbstractQueryPager imple
      public PartitionIterator fetchPage(int pageSize, ConsistencyLevel 
consistency, ClientState clientState) throws RequestValidationException, 
RequestExecutionException
      {
          if (isExhausted())
-             return PartitionIterators.EMPTY;
+             return EmptyIterators.partition();
  
          pageSize = Math.min(pageSize, remaining);
-         return new 
PagerIterator(nextPageReadCommand(pageSize).execute(consistency, clientState), 
limits.forPaging(pageSize), command.nowInSec());
+         Pager pager = new Pager(limits.forPaging(pageSize), 
command.nowInSec());
+         return 
Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, 
clientState), pager);
      }
  
 -    public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup 
orderGroup) throws RequestValidationException, RequestExecutionException
 +    public PartitionIterator fetchPageInternal(int pageSize, 
ReadExecutionController executionController) throws RequestValidationException, 
RequestExecutionException
      {
          if (isExhausted())
-             return PartitionIterators.EMPTY;
+             return EmptyIterators.partition();
  
          pageSize = Math.min(pageSize, remaining);
-         return new 
PagerIterator(nextPageReadCommand(pageSize).executeInternal(executionController),
 limits.forPaging(pageSize), command.nowInSec());
+         Pager pager = new Pager(limits.forPaging(pageSize), 
command.nowInSec());
 -        return 
Transformation.apply(nextPageReadCommand(pageSize).executeInternal(orderGroup), 
pager);
++        return 
Transformation.apply(nextPageReadCommand(pageSize).executeInternal(executionController),
 pager);
      }
  
-     private class PagerIterator extends CountingPartitionIterator
+     private class Pager extends Transformation<RowIterator>
      {
          private final DataLimits pageLimits;
- 
+         private final DataLimits.Counter counter;
          private Row lastRow;
- 
          private boolean isFirstPartition = true;
-         private RowIterator nextPartition;
  
-         private PagerIterator(PartitionIterator iter, DataLimits pageLimits, 
int nowInSec)
+         private Pager(DataLimits pageLimits, int nowInSec)
          {
-             super(iter, pageLimits, nowInSec);
+             this.counter = pageLimits.newCounter(nowInSec, true);
              this.pageLimits = pageLimits;
          }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index fca0165,8caa14d..922df2e
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@@ -131,13 -129,13 +129,13 @@@ public class MultiPartitionPager implem
      }
  
      @SuppressWarnings("resource") // iter closed via countingIter
 -    public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup 
orderGroup) throws RequestValidationException, RequestExecutionException
 +    public PartitionIterator fetchPageInternal(int pageSize, 
ReadExecutionController executionController) throws RequestValidationException, 
RequestExecutionException
      {
          int toQuery = Math.min(remaining, pageSize);
 -        PagersIterator iter = new PagersIterator(toQuery, null, null, 
orderGroup);
 +        PagersIterator iter = new PagersIterator(toQuery, null, null, 
executionController);
-         CountingPartitionIterator countingIter = new 
CountingPartitionIterator(iter, limit.forPaging(toQuery), nowInSec);
-         iter.setCounter(countingIter.counter());
-         return countingIter;
+         DataLimits.Counter counter = 
limit.forPaging(toQuery).newCounter(nowInSec, true);
+         iter.setCounter(counter);
+         return counter.applyTo(iter);
      }
  
      private class PagersIterator extends AbstractIterator<RowIterator> 
implements PartitionIterator

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/QueryPager.java
index 1d5a739,cdf2b97..e2d7f5e
--- a/src/java/org/apache/cassandra/service/pager/QueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@@ -18,9 -18,9 +18,9 @@@
  package org.apache.cassandra.service.pager;
  
  import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.ReadExecutionController;
+ import org.apache.cassandra.db.EmptyIterators;
 -import org.apache.cassandra.db.ReadOrderGroup;
  import org.apache.cassandra.db.partitions.PartitionIterator;
- import org.apache.cassandra.db.partitions.PartitionIterators;
  import org.apache.cassandra.exceptions.RequestExecutionException;
  import org.apache.cassandra.exceptions.RequestValidationException;
  import org.apache.cassandra.service.ClientState;
@@@ -55,12 -55,12 +55,12 @@@ public interface QueryPage
  
          public PartitionIterator fetchPage(int pageSize, ConsistencyLevel 
consistency, ClientState clientState) throws RequestValidationException, 
RequestExecutionException
          {
-             return PartitionIterators.EMPTY;
+             return EmptyIterators.partition();
          }
  
 -        public PartitionIterator fetchPageInternal(int pageSize, 
ReadOrderGroup orderGroup) throws RequestValidationException, 
RequestExecutionException
 +        public PartitionIterator fetchPageInternal(int pageSize, 
ReadExecutionController executionController) throws RequestValidationException, 
RequestExecutionException
          {
-             return PartitionIterators.EMPTY;
+             return EmptyIterators.partition();
          }
  
          public boolean isExhausted()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------

Reply via email to