Merge branch 'cassandra-3.0' into trunk
Conflicts:
src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
src/java/org/apache/cassandra/service/pager/QueryPager.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/928e4c28
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/928e4c28
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/928e4c28
Branch: refs/heads/trunk
Commit: 928e4c28c63e0142979d097ec57110dbc1046d67
Parents: 73c4826 6094974
Author: Benedict Elliott Smith <[email protected]>
Authored: Mon Oct 26 21:08:07 2015 +0000
Committer: Benedict Elliott Smith <[email protected]>
Committed: Mon Oct 26 21:08:07 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/EmptyIterators.java | 214 ++++++++++++
.../cassandra/db/PartitionRangeReadCommand.java | 10 +-
.../org/apache/cassandra/db/ReadCommand.java | 170 +++++-----
src/java/org/apache/cassandra/db/ReadQuery.java | 4 +-
.../db/SinglePartitionNamesCommand.java | 2 +-
.../db/SinglePartitionSliceCommand.java | 2 +-
.../db/compaction/CompactionIterator.java | 17 +-
.../db/filter/ClusteringIndexNamesFilter.java | 10 +-
.../db/filter/ClusteringIndexSliceFilter.java | 19 +-
.../apache/cassandra/db/filter/DataLimits.java | 185 ++++++++---
.../apache/cassandra/db/filter/RowFilter.java | 45 +--
.../AlteringUnfilteredPartitionIterator.java | 72 ----
.../db/partitions/BasePartitionIterator.java | 27 ++
.../partitions/CountingPartitionIterator.java | 58 ----
.../db/partitions/CountingRowIterator.java | 58 ----
.../CountingUnfilteredPartitionIterator.java | 52 ---
.../CountingUnfilteredRowIterator.java | 64 ----
.../db/partitions/PartitionIterator.java | 3 +-
.../db/partitions/PartitionIterators.java | 102 ++----
.../cassandra/db/partitions/PurgeFunction.java | 120 +++++++
.../db/partitions/PurgingPartitionIterator.java | 156 ---------
.../partitions/UnfilteredPartitionIterator.java | 6 +-
.../UnfilteredPartitionIterators.java | 125 ++-----
.../partitions/WrappingPartitionIterator.java | 50 ---
.../WrappingUnfilteredPartitionIterator.java | 126 -------
.../db/rows/AlteringUnfilteredRowIterator.java | 98 ------
.../cassandra/db/rows/BaseRowIterator.java | 64 ++++
.../apache/cassandra/db/rows/RowIterator.java | 32 +-
.../apache/cassandra/db/rows/RowIterators.java | 68 +---
.../db/rows/UnfilteredRowIterator.java | 32 +-
.../rows/UnfilteredRowIteratorSerializer.java | 2 +-
.../db/rows/UnfilteredRowIterators.java | 215 +++---------
.../cassandra/db/rows/WrappingRowIterator.java | 79 -----
.../db/rows/WrappingUnfilteredRowIterator.java | 2 +-
.../cassandra/db/transform/BaseIterator.java | 129 ++++++++
.../cassandra/db/transform/BasePartitions.java | 100 ++++++
.../apache/cassandra/db/transform/BaseRows.java | 139 ++++++++
.../apache/cassandra/db/transform/Filter.java | 56 ++++
.../db/transform/FilteredPartitions.java | 40 +++
.../cassandra/db/transform/FilteredRows.java | 40 +++
.../cassandra/db/transform/MoreContents.java | 8 +
.../cassandra/db/transform/MorePartitions.java | 35 ++
.../apache/cassandra/db/transform/MoreRows.java | 36 ++
.../apache/cassandra/db/transform/Stack.java | 81 +++++
.../db/transform/StoppingTransformation.java | 60 ++++
.../cassandra/db/transform/Transformation.java | 145 +++++++++
.../db/transform/UnfilteredPartitions.java | 27 ++
.../cassandra/db/transform/UnfilteredRows.java | 40 +++
.../cassandra/index/SecondaryIndexBuilder.java | 9 +-
.../internal/composites/CompositesSearcher.java | 24 +-
.../io/sstable/ReducingKeyIterator.java | 5 +-
.../io/sstable/format/big/BigTableWriter.java | 29 +-
.../apache/cassandra/service/DataResolver.java | 106 +++---
.../apache/cassandra/service/StorageProxy.java | 10 +-
.../service/pager/AbstractQueryPager.java | 111 +++----
.../service/pager/MultiPartitionPager.java | 14 +-
.../cassandra/service/pager/QueryPager.java | 6 +-
.../cassandra/service/pager/QueryPagers.java | 7 +-
.../cassandra/thrift/ThriftResultsMerger.java | 26 +-
.../cassandra/utils/CloseableIterator.java | 3 +-
.../org/apache/cassandra/utils/Throwables.java | 9 +-
.../Keyspace1-Standard1-jb-0-Summary.db | Bin 202 -> 162 bytes
test/unit/org/apache/cassandra/Util.java | 4 +-
.../apache/cassandra/db/TransformerTest.java | 325 +++++++++++++++++++
.../apache/cassandra/repair/ValidatorTest.java | 7 +-
.../cassandra/service/DataResolverTest.java | 6 +-
67 files changed, 2226 insertions(+), 1701 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 02f6584,12f62f7..c28d3bd
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,5 +1,11 @@@
+3.2
+ * Added graphing option to cassandra-stress (CASSANDRA-7918)
+ * Abort in-progress queries that time out (CASSANDRA-7392)
+ * Add transparent data encryption core classes (CASSANDRA-9945)
+
+
3.0
+ * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975)
* Remove token generator (CASSANDRA-5261)
* RolesCache should not be created for any authenticator that does not
requireAuthentication (CASSANDRA-10562)
* Fix LogTransaction checking only a single directory for files
(CASSANDRA-10421)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index 1ed8bb4,4d9b65b..f50a8cf
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -28,9 -28,9 +28,11 @@@ import org.slf4j.LoggerFactory
import org.apache.cassandra.config.*;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.monitoring.MonitorableImpl;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
++import org.apache.cassandra.db.transform.StoppingTransformation;
+ import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.io.IVersionedSerializer;
@@@ -446,75 -410,60 +414,100 @@@ public abstract class ReadCommand exten
}
@Override
- public void close()
+ public Row applyToRow(Row row)
{
- try
+ if (row.hasLiveData(ReadCommand.this.nowInSec()))
+ ++liveRows;
+
+ for (Cell cell : row.cells())
{
- super.close();
+ if (!cell.isLive(ReadCommand.this.nowInSec()))
+ countTombstone(row.clustering());
}
- finally
- {
- recordLatency(metric, System.nanoTime() - startTimeNanos);
-
- metric.tombstoneScannedHistogram.update(tombstones);
- metric.liveScannedHistogram.update(liveRows);
+ return row;
+ }
- boolean warnTombstones = tombstones > warningThreshold &&
respectTombstoneThresholds;
- if (warnTombstones)
- {
- String msg = String.format("Read %d live rows and %d
tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows,
tombstones, ReadCommand.this.toCQLString());
- ClientWarn.warn(msg);
- logger.warn(msg);
- }
+ @Override
+ public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker
marker)
+ {
+ countTombstone(marker.clustering());
+ return marker;
+ }
- Tracing.trace("Read {} live and {} tombstone cells{}",
liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" :
""));
+ private void countTombstone(ClusteringPrefix clustering)
+ {
+ ++tombstones;
+ if (tombstones > failureThreshold &&
respectTombstoneThresholds)
+ {
+ String query = ReadCommand.this.toCQLString();
+ Tracing.trace("Scanned over {} tombstones for query {};
query aborted (see tombstone_failure_threshold)", failureThreshold, query);
+ throw new TombstoneOverwhelmingException(tombstones,
query, ReadCommand.this.metadata(), currentKey, clustering);
}
}
- };
- }
- protected UnfilteredPartitionIterator
withStateTracking(UnfilteredPartitionIterator iter)
- {
- return new WrappingUnfilteredPartitionIterator(iter)
- {
@Override
- public UnfilteredRowIterator computeNext(UnfilteredRowIterator
iter)
+ public void onClose()
{
- if (isAborted())
- return null;
+ recordLatency(metric, System.nanoTime() - startTimeNanos);
- if (TEST_ITERATION_DELAY_MILLIS > 0)
- maybeDelayForTesting();
+ metric.tombstoneScannedHistogram.update(tombstones);
+ metric.liveScannedHistogram.update(liveRows);
- return iter;
+ boolean warnTombstones = tombstones > warningThreshold &&
respectTombstoneThresholds;
+ if (warnTombstones)
+ {
+ String msg = String.format("Read %d live rows and %d
tombstone cells for query %1.512s (see tombstone_warn_threshold)", liveRows,
tombstones, ReadCommand.this.toCQLString());
+ ClientWarn.warn(msg);
+ logger.warn(msg);
+ }
+
+ Tracing.trace("Read {} live and {} tombstone cells{}",
liveRows, tombstones, (warnTombstones ? " (see tombstone_warn_threshold)" :
""));
}
};
+
+ return Transformation.apply(iter, new MetricRecording());
}
- protected UnfilteredRowIterator withStateTracking(UnfilteredRowIterator
iter)
++ protected class CheckForAbort extends
StoppingTransformation<BaseRowIterator<?>>
+ {
- return new WrappingUnfilteredRowIterator(iter)
++ protected BaseRowIterator<?> applyToPartition(BaseRowIterator
partition)
+ {
- @Override
- public boolean hasNext()
- {
- if (isAborted())
- return false;
++ maybeAbort();
++ return partition;
++ }
+
- if (TEST_ITERATION_DELAY_MILLIS > 0)
- maybeDelayForTesting();
++ protected Row applyToRow(Row row)
++ {
++ maybeAbort();
++ return row;
++ }
+
- return super.hasNext();
- }
- };
++ private void maybeAbort()
++ {
++ if (isAborted())
++ stop();
++
++ if (TEST_ITERATION_DELAY_MILLIS > 0)
++ maybeDelayForTesting();
++ }
++ }
++
++ protected UnfilteredPartitionIterator
withStateTracking(UnfilteredPartitionIterator iter)
++ {
++ return Transformation.apply(iter, new CheckForAbort());
++ }
++
++ protected UnfilteredRowIterator withStateTracking(UnfilteredRowIterator
iter)
++ {
++ return Transformation.apply(iter, new CheckForAbort());
+ }
+
+ private void maybeDelayForTesting()
+ {
+ if (!metadata.ksName.startsWith("system"))
+ FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
+ }
+
/**
* Creates a message for this command.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ReadQuery.java
index 2b5c09c,178ca7c..ba7b893
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@@ -42,12 -42,12 +42,12 @@@ public interface ReadQuer
public PartitionIterator execute(ConsistencyLevel consistency,
ClientState clientState) throws RequestExecutionException
{
- return PartitionIterators.EMPTY;
+ return EmptyIterators.partition();
}
- public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
+ public PartitionIterator executeInternal(ReadExecutionController
controller)
{
- return PartitionIterators.EMPTY;
+ return EmptyIterators.partition();
}
public DataLimits limits()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/db/SinglePartitionSliceCommand.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/db/filter/ClusteringIndexNamesFilter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index de4e54b,2599b8d..386d7ae
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@@ -61,33 -60,33 +60,33 @@@ abstract class AbstractQueryPager imple
public PartitionIterator fetchPage(int pageSize, ConsistencyLevel
consistency, ClientState clientState) throws RequestValidationException,
RequestExecutionException
{
if (isExhausted())
- return PartitionIterators.EMPTY;
+ return EmptyIterators.partition();
pageSize = Math.min(pageSize, remaining);
- return new
PagerIterator(nextPageReadCommand(pageSize).execute(consistency, clientState),
limits.forPaging(pageSize), command.nowInSec());
+ Pager pager = new Pager(limits.forPaging(pageSize),
command.nowInSec());
+ return
Transformation.apply(nextPageReadCommand(pageSize).execute(consistency,
clientState), pager);
}
- public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup
orderGroup) throws RequestValidationException, RequestExecutionException
+ public PartitionIterator fetchPageInternal(int pageSize,
ReadExecutionController executionController) throws RequestValidationException,
RequestExecutionException
{
if (isExhausted())
- return PartitionIterators.EMPTY;
+ return EmptyIterators.partition();
pageSize = Math.min(pageSize, remaining);
- return new
PagerIterator(nextPageReadCommand(pageSize).executeInternal(executionController),
limits.forPaging(pageSize), command.nowInSec());
+ Pager pager = new Pager(limits.forPaging(pageSize),
command.nowInSec());
- return
Transformation.apply(nextPageReadCommand(pageSize).executeInternal(orderGroup),
pager);
++ return
Transformation.apply(nextPageReadCommand(pageSize).executeInternal(executionController),
pager);
}
- private class PagerIterator extends CountingPartitionIterator
+ private class Pager extends Transformation<RowIterator>
{
private final DataLimits pageLimits;
-
+ private final DataLimits.Counter counter;
private Row lastRow;
-
private boolean isFirstPartition = true;
- private RowIterator nextPartition;
- private PagerIterator(PartitionIterator iter, DataLimits pageLimits,
int nowInSec)
+ private Pager(DataLimits pageLimits, int nowInSec)
{
- super(iter, pageLimits, nowInSec);
+ this.counter = pageLimits.newCounter(nowInSec, true);
this.pageLimits = pageLimits;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index fca0165,8caa14d..922df2e
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@@ -131,13 -129,13 +129,13 @@@ public class MultiPartitionPager implem
}
@SuppressWarnings("resource") // iter closed via countingIter
- public PartitionIterator fetchPageInternal(int pageSize, ReadOrderGroup
orderGroup) throws RequestValidationException, RequestExecutionException
+ public PartitionIterator fetchPageInternal(int pageSize,
ReadExecutionController executionController) throws RequestValidationException,
RequestExecutionException
{
int toQuery = Math.min(remaining, pageSize);
- PagersIterator iter = new PagersIterator(toQuery, null, null,
orderGroup);
+ PagersIterator iter = new PagersIterator(toQuery, null, null,
executionController);
- CountingPartitionIterator countingIter = new
CountingPartitionIterator(iter, limit.forPaging(toQuery), nowInSec);
- iter.setCounter(countingIter.counter());
- return countingIter;
+ DataLimits.Counter counter =
limit.forPaging(toQuery).newCounter(nowInSec, true);
+ iter.setCounter(counter);
+ return counter.applyTo(iter);
}
private class PagersIterator extends AbstractIterator<RowIterator>
implements PartitionIterator
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/pager/QueryPager.java
index 1d5a739,cdf2b97..e2d7f5e
--- a/src/java/org/apache/cassandra/service/pager/QueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@@ -18,9 -18,9 +18,9 @@@
package org.apache.cassandra.service.pager;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.ReadExecutionController;
+ import org.apache.cassandra.db.EmptyIterators;
-import org.apache.cassandra.db.ReadOrderGroup;
import org.apache.cassandra.db.partitions.PartitionIterator;
- import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.service.ClientState;
@@@ -55,12 -55,12 +55,12 @@@ public interface QueryPage
public PartitionIterator fetchPage(int pageSize, ConsistencyLevel
consistency, ClientState clientState) throws RequestValidationException,
RequestExecutionException
{
- return PartitionIterators.EMPTY;
+ return EmptyIterators.partition();
}
- public PartitionIterator fetchPageInternal(int pageSize,
ReadOrderGroup orderGroup) throws RequestValidationException,
RequestExecutionException
+ public PartitionIterator fetchPageInternal(int pageSize,
ReadExecutionController executionController) throws RequestValidationException,
RequestExecutionException
{
- return PartitionIterators.EMPTY;
+ return EmptyIterators.partition();
}
public boolean isExhausted()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/928e4c28/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------