This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by
this push:
new d946c22b8a CEP-45: Fix ALLOW FILTERING queries for mutation tracking
d946c22b8a is described below
commit d946c22b8a5e3ac7720df4355dfdfbe1eade023f
Author: Blake Eggleston <[email protected]>
AuthorDate: Wed Apr 30 10:52:30 2025 -0700
CEP-45: Fix ALLOW FILTERING queries for mutation tracking
Patch by Blake Eggleston; Reviewed by Aleksey Yeschenko for CASSANDRA-20555
---
src/java/org/apache/cassandra/db/ReadCommand.java | 31 +-
.../cassandra/db/SinglePartitionReadCommand.java | 17 +
.../org/apache/cassandra/db/filter/RowFilter.java | 141 +++--
.../db/partitions/PartitionIterators.java | 49 +-
.../reads/tracked/AbstractPartialTrackedRead.java | 228 ++++++--
.../reads/tracked/ExtendingCompletedRead.java | 186 ++++++
.../reads/tracked/FilteredFollowupRead.java | 177 ++++++
.../reads/tracked/PartialTrackedRangeRead.java | 638 +++++++++++++++------
.../service/reads/tracked/PartialTrackedRead.java | 25 +-
.../tracked/PartialTrackedSinglePartitionRead.java | 110 ++--
.../service/reads/tracked/TrackedDataResponse.java | 87 ++-
.../reads/tracked/TrackedLocalReadCoordinator.java | 18 +-
.../service/reads/tracked/TrackedLocalReads.java | 5 +-
.../service/reads/tracked/TrackedRead.java | 35 +-
test/conf/logback-dtest.xml | 8 +-
.../test/ReadRepairRangeQueriesTest.java | 123 +++-
.../test/ReadRepairSliceQueriesTest.java | 3 -
.../tracking/MutationTrackingPendingReadTest.java | 2 +-
.../MutationTrackingReadReconciliationTest.java | 36 +-
19 files changed, 1531 insertions(+), 388 deletions(-)
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 453cfcf95b..7920a41d94 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -476,6 +476,24 @@ public abstract class ReadCommand extends AbstractReadQuery
}
}
+ public RowFilter rowFilter(Index.Searcher searcher)
+ {
+ // If we've used a 2ndary index, we know the result already satisfy
the primary expression used, so
+ // no point in checking it again.
+ return (null == searcher) ? rowFilter() :
indexQueryPlan.postIndexQueryFilter();
+ }
+
+ private UnfilteredPartitionIterator
withRowFilter(UnfilteredPartitionIterator iterator, Index.Searcher searcher)
+ {
+ /*
+ * TODO: We'll currently do filtering by the rowFilter here because
it's convenient. However,
+ * we'll probably want to optimize by pushing it down the layer (like
for dropped columns) as it
+ * would be more efficient (the sooner we discard stuff we know we
don't care, the less useless
+ * processing we do on it).
+ */
+ return rowFilter(searcher).filter(iterator, nowInSec());
+ }
+
private UnfilteredPartitionIterator
completeRead(UnfilteredPartitionIterator iterator, ReadExecutionController
executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long
startTimeNanos)
{
COMMAND.set(this);
@@ -488,18 +506,7 @@ public abstract class ReadCommand extends AbstractReadQuery
iterator = maybeRecordPurgeableTombstones(iterator, cfs);
iterator =
RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs,
executionController), Stage.PURGED, false);
iterator = withMetricsRecording(iterator, cfs.metric,
startTimeNanos);
-
- // If we've used a 2ndary index, we know the result already
satisfy the primary expression used, so
- // no point in checking it again.
- RowFilter filter = (null == searcher) ? rowFilter() :
indexQueryPlan.postIndexQueryFilter();
-
- /*
- * TODO: We'll currently do filtering by the rowFilter here
because it's convenient. However,
- * we'll probably want to optimize by pushing it down the layer
(like for dropped columns) as it
- * would be more efficient (the sooner we discard stuff we know we
don't care, the less useless
- * processing we do on it).
- */
- iterator = filter.filter(iterator, nowInSec());
+ iterator = withRowFilter(iterator, searcher);
// apply the limits/row counter; this transformation is stopping
and would close the iterator as soon
// as the count is observed; if that happens in the middle of an
open RT, its end bound will not be included.
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 018be1dcae..1a4d12fa98 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -162,6 +162,23 @@ public class SinglePartitionReadCommand extends
ReadCommand implements SinglePar
dataRange);
}
+ public static SinglePartitionReadCommand fromRangeRead(DecoratedKey key,
PartitionRangeReadCommand command, DataLimits limits)
+ {
+ return create(command.serializedAtEpoch(),
+ command.isDigestQuery(),
+ command.digestVersion(),
+ command.acceptsTransient(),
+ command.metadata(),
+ command.nowInSec(),
+ command.columnFilter(),
+ command.rowFilter(),
+ limits,
+ key,
+ command.clusteringIndexFilter(key),
+ command.indexQueryPlan(),
+ command.isTrackingWarnings());
+ }
+
/**
* Creates a new read command on a single partition.
*
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 4f6b7bd1d5..3bd07dc943 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.MapType;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.BaseRowIterator;
import org.apache.cassandra.db.rows.Cell;
@@ -58,6 +59,7 @@ import org.apache.cassandra.db.rows.CellPath;
import org.apache.cassandra.db.rows.ComplexColumnData;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -200,73 +202,118 @@ public class RowFilter implements
Iterable<RowFilter.Expression>
return false;
}
- /**
- * Note that the application of this transformation does not yet take
{@link #isStrict()} into account. This means
- * that even when strict filtering is not safe, expressions will be
applied as intersections rather than unions.
- * The filter will always be evaluated strictly in conjunction with
replica filtering protection at the
- * coordinator, however, even after CASSANDRA-19007 is addressed.
- *
- * @see <a
href="https://issues.apache.org/jira/browse/CASSANDRA-190007">CASSANDRA-19007</a>
- */
- protected Transformation<BaseRowIterator<?>> filter(TableMetadata
metadata, long nowInSec)
+ public static class RowFilterTransformation extends
Transformation<BaseRowIterator<?>>
{
- List<Expression> partitionLevelExpressions = new ArrayList<>();
- List<Expression> rowLevelExpressions = new ArrayList<>();
- for (Expression e: expressions)
+ private final TableMetadata metadata;
+ private final long nowInSec;
+ private final List<Expression> partitionLevelExpressions = new
ArrayList<>();
+ private final List<Expression> rowLevelExpressions = new ArrayList<>();
+ private DecoratedKey pk;
+
+ private RowFilterTransformation(RowFilter filter, TableMetadata
metadata, long nowInSec)
{
- if (e.column.isStatic() || e.column.isPartitionKey())
- partitionLevelExpressions.add(e);
- else
- rowLevelExpressions.add(e);
+ this.metadata = metadata;
+ this.nowInSec = nowInSec;
+ for (Expression e: filter.expressions)
+ {
+ if (e.column.isStatic() || e.column.isPartitionKey())
+ partitionLevelExpressions.add(e);
+ else
+ rowLevelExpressions.add(e);
+ }
}
- long numberOfRegularColumnExpressions = rowLevelExpressions.size();
- final boolean filterNonStaticColumns =
numberOfRegularColumnExpressions > 0;
-
- return new Transformation<>()
+ public int potentialMatches(PartitionUpdate update)
{
- DecoratedKey pk;
-
- @Override
- protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?>
partition)
+ try (UnfilteredRowIterator partition = update.unfilteredIterator())
{
- pk = partition.partitionKey();
-
- // Short-circuit all partitions that won't match based on
static and partition keys
+ int matches = 0;
for (Expression e : partitionLevelExpressions)
- if (!e.isSatisfiedBy(metadata, partition.partitionKey(),
partition.staticRow(), nowInSec))
+ {
+ if (e.isSatisfiedBy(metadata, partition.partitionKey(),
partition.staticRow(), nowInSec))
{
- partition.close();
- return null;
+ matches++;
+ break;
}
+ }
- BaseRowIterator<?> iterator = partition instanceof
UnfilteredRowIterator
- ?
Transformation.apply((UnfilteredRowIterator) partition, this)
- :
Transformation.apply((RowIterator) partition, this);
+ while (partition.hasNext())
+ {
+ Unfiltered unfiltered = partition.next();
+
+ if (unfiltered instanceof Row)
+ {
+ Row row = (Row) unfiltered;
+
+ for (Expression e : rowLevelExpressions)
+ {
+ if (e.isSatisfiedBy(metadata, pk, row, nowInSec))
+ {
+ matches++;
+ break;
+ }
+ }
+ }
+ }
+ return matches;
+ }
+ }
- if (filterNonStaticColumns && !iterator.hasNext())
+ @Override
+ protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?>
partition)
+ {
+ pk = partition.partitionKey();
+
+ // Short-circuit all partitions that won't match based on static
and partition keys
+ for (Expression e : partitionLevelExpressions)
+ {
+ if (!e.isSatisfiedBy(metadata, partition.partitionKey(),
partition.staticRow(), nowInSec))
{
- iterator.close();
+ partition.close();
return null;
}
-
- return iterator;
}
- @Override
- public Row applyToRow(Row row)
+ BaseRowIterator<?> iterator = partition instanceof
UnfilteredRowIterator
+ ?
Transformation.apply((UnfilteredRowIterator) partition, this)
+ : Transformation.apply((RowIterator)
partition, this);
+
+ boolean filterNonStaticColumns = !rowLevelExpressions.isEmpty();
+ if (filterNonStaticColumns && !iterator.hasNext())
{
- Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec,
metadata.enforceStrictLiveness());
- if (purged == null)
+ iterator.close();
+ return null;
+ }
+
+ return iterator;
+ }
+
+ @Override
+ public Row applyToRow(Row row)
+ {
+ Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec,
metadata.enforceStrictLiveness());
+ if (purged == null)
+ return null;
+
+ for (Expression e : rowLevelExpressions)
+ if (!e.isSatisfiedBy(metadata, pk, purged, nowInSec))
return null;
- for (Expression e : rowLevelExpressions)
- if (!e.isSatisfiedBy(metadata, pk, purged, nowInSec))
- return null;
+ return row;
+ }
+ }
- return row;
- }
- };
+ /**
+ * Note that the application of this transformation does not yet take
{@link #isStrict()} into account. This means
+ * that even when strict filtering is not safe, expressions will be
applied as intersections rather than unions.
+ * The filter will always be evaluated strictly in conjunction with
replica filtering protection at the
+ * coordinator, however, even after CASSANDRA-19007 is addressed.
+ *
+ * @see <a
href="https://issues.apache.org/jira/browse/CASSANDRA-190007">CASSANDRA-19007</a>
+ */
+ public RowFilterTransformation filter(TableMetadata metadata, long
nowInSec)
+ {
+ return new RowFilterTransformation( this, metadata, nowInSec);
}
/**
diff --git
a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index 9231914201..00e16e9d4d 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.utils.AbstractIterator;
import org.apache.cassandra.db.SinglePartitionReadQuery;
import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.MergeIterator;
public abstract class PartitionIterators
{
@@ -99,6 +100,52 @@ public abstract class PartitionIterators
}
}
+ /**
+ * Merges multiple partition iterators with the requirement that there are
no keys in common between any
+ * of the iterators
+ */
+ public static PartitionIterator
mergeNonOverlapping(List<PartitionIterator> iterators)
+ {
+ MergeIterator.Reducer<RowIterator, RowIterator> reducer = new
MergeIterator.Reducer<>()
+ {
+ RowIterator current;
+
+ @Override
+ protected void onKeyChange()
+ {
+ current = null;
+ }
+
+ @Override
+ public void reduce(int idx, RowIterator partition)
+ {
+ if (current != null)
+ {
+ throw new IllegalStateException("Multiple partitions
received for " + current.partitionKey());
+ }
+ current = partition;
+ }
+
+ @Override
+ protected RowIterator getReduced()
+ {
+ return current;
+ }
+ };
+
+ MergeIterator<RowIterator, RowIterator> mergeIterator =
MergeIterator.get(iterators, rowIteratorComparator, reducer);
+
+ return new AbstractPartitionIterator()
+ {
+ @Override
+ protected RowIterator computeNext()
+ {
+ return mergeIterator.hasNext() ? mergeIterator.next() :
endOfData();
+ }
+ };
+ }
+ private static final Comparator<RowIterator> rowIteratorComparator =
Comparator.comparing(BaseRowIterator::partitionKey);
+
/**
* Consumes all rows in the next partition of the provided partition
iterator.
*/
@@ -224,5 +271,5 @@ public abstract class PartitionIterators
}
};
}
- };
+ }
}
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java
b/src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java
index 0cbb3be38d..2819068a43 100644
---
a/src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java
+++
b/src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java
@@ -18,8 +18,6 @@
package org.apache.cassandra.service.reads.tracked;
-import java.util.List;
-
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
@@ -30,34 +28,176 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.transform.RTBoundValidator;
-import org.apache.cassandra.index.Index;
-import static
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
public abstract class AbstractPartialTrackedRead implements PartialTrackedRead
{
private static final Logger logger =
LoggerFactory.getLogger(AbstractPartialTrackedRead.class);
- private enum State
+ protected interface Augmentable
+ {
+ State augment(PartitionUpdate update);
+ }
+
+ protected static abstract class State
+ {
+ protected static final State CLOSED = new State()
+ {
+ @Override
+ String name()
+ {
+ return "closed";
+ }
+
+ @Override
+ boolean isClosed()
+ {
+ return true;
+ }
+ };
+
+ abstract String name();
+
+ boolean isInitialized()
+ {
+ return false;
+ }
+
+ Initialized asInitialized()
+ {
+ throw new IllegalStateException("State is " + name() + ", not " +
Initialized.NAME);
+ }
+
+ boolean isPrepared()
+ {
+ return false;
+ }
+
+ Prepared asPrepared()
+ {
+ throw new IllegalStateException("State is " + name() + ", not " +
Prepared.NAME);
+ }
+
+ boolean isCompleted()
+ {
+ return false;
+ }
+
+ Completed asCompleted()
+ {
+ throw new IllegalStateException("State is " + name() + ", not " +
Completed.NAME);
+ }
+
+ boolean isAugmentable()
+ {
+ return isPrepared() || isInitialized();
+ }
+
+ Augmentable asAugmentable()
+ {
+ if (isPrepared()) return asPrepared();
+ throw new IllegalStateException("State is " + name() + ", not
augmentable");
+ }
+
+ boolean isClosed()
+ {
+ return false;
+ }
+
+ void close()
+ {
+ }
+ }
+
+ // TODO (expected): this is a redundant state, never exposed
+ protected final class Initialized extends State
+ {
+ static final String NAME = "initialized";
+
+ @Override
+ String name()
+ {
+ return NAME;
+ }
+
+ @Override
+ boolean isInitialized()
+ {
+ return true;
+ }
+
+ @Override
+ Initialized asInitialized()
+ {
+ return this;
+ }
+
+ Prepared prepare(UnfilteredPartitionIterator initialData)
+ {
+ return prepareInternal(initialData);
+ }
+ }
+
+ protected abstract Prepared prepareInternal(UnfilteredPartitionIterator
initialData);
+
+ protected abstract class Prepared extends State implements Augmentable
{
- INITIALIZED,
- PREPARED,
- READING,
- FINISHED
+ private static final String NAME = "prepared";
+
+ @Override
+ String name()
+ {
+ return NAME;
+ }
+
+ @Override
+ boolean isPrepared()
+ {
+ return true;
+ }
+
+ @Override
+ Prepared asPrepared()
+ {
+ return this;
+ }
+
+ abstract Completed complete();
+
+ }
+
+ protected abstract class Completed extends State
+ {
+ private static final String NAME = "completed";
+
+ @Override
+ String name()
+ {
+ return NAME;
+ }
+
+ protected abstract UnfilteredPartitionIterator iterator();
+ protected abstract CompletedRead
createResult(UnfilteredPartitionIterator iterator);
+
+ protected CompletedRead getResult()
+ {
+ UnfilteredPartitionIterator result =
command().completeTrackedRead(iterator(), AbstractPartialTrackedRead.this);
+ // validate that the sequence of RT markers is correct: open is
followed by close, deletion times for both
+ // ends equal, and there are no dangling RT bound in any partition.
+ result = RTBoundValidator.validate(result,
RTBoundValidator.Stage.PROCESSED, true);
+ return createResult(result);
+ }
}
final ReadExecutionController executionController;
- final Index.Searcher searcher;
final ColumnFamilyStore cfs;
final long startTimeNanos;
- volatile State state = State.INITIALIZED;
+ private State state = new Initialized();
- public AbstractPartialTrackedRead(ReadExecutionController
executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long
startTimeNanos)
+ public AbstractPartialTrackedRead(ReadExecutionController
executionController, ColumnFamilyStore cfs, long startTimeNanos)
{
this.executionController = executionController;
- this.searcher = searcher;
this.cfs = cfs;
this.startTimeNanos = startTimeNanos;
}
@@ -68,12 +208,6 @@ public abstract class AbstractPartialTrackedRead implements
PartialTrackedRead
return executionController;
}
- @Override
- public Index.Searcher searcher()
- {
- return searcher;
- }
-
@Override
public ColumnFamilyStore cfs()
{
@@ -86,69 +220,47 @@ public abstract class AbstractPartialTrackedRead
implements PartialTrackedRead
return startTimeNanos;
}
- abstract void freezeInitialData();
-
- abstract UnfilteredPartitionIterator initialData();
-
- abstract UnfilteredPartitionIterator augmentedData();
-
- abstract void augmentResponse(PartitionUpdate update);
+ protected synchronized State state()
+ {
+ return state;
+ }
/**
* Implementors need to call this before returning this from
createInProgressRead
+ * TODO (expected): this is a redundant transition from a redundant state
(INITIALIZED)
*/
- synchronized void prepare()
+ synchronized void prepare(UnfilteredPartitionIterator initialData)
{
logger.trace("Preparing read {}", this);
- Preconditions.checkState(state == State.INITIALIZED);
- freezeInitialData();
- state = State.PREPARED;
+ state = state.asInitialized().prepare(initialData);
}
@Override
- public void augment(Mutation mutation)
+ public synchronized void augment(Mutation mutation)
{
- Preconditions.checkState(state == State.PREPARED);
PartitionUpdate update =
mutation.getPartitionUpdate(command().metadata());
if (update != null)
- augmentResponse(update);
+ state = state.asAugmentable().augment(update);
}
- private UnfilteredPartitionIterator complete(UnfilteredPartitionIterator
iterator)
- {
- return command().completeTrackedRead(iterator, this);
- }
-
- abstract CompletedRead createResult(UnfilteredPartitionIterator iterator);
-
@Override
public synchronized CompletedRead complete()
{
- Preconditions.checkState(state == State.PREPARED);
- state = State.READING;
-
- UnfilteredPartitionIterator initial = initialData();
- UnfilteredPartitionIterator augmented = augmentedData();
-
- UnfilteredPartitionIterator result = augmented != null ?
-
UnfilteredPartitionIterators.merge(List.of(initial, augmented), NOOP) :
- initial;
-
- result = complete(result);
- // validate that the sequence of RT markers is correct: open is
followed by close, deletion times for both
- // ends equal, and there are no dangling RT bound in any partition.
- result = RTBoundValidator.validate(result,
RTBoundValidator.Stage.PROCESSED, true);
- return createResult(complete(result));
+ Preconditions.checkState(state.isPrepared());
+ Completed completed = state.asPrepared().complete();
+ state = completed;
+ return completed.getResult();
}
@Override
public synchronized void close()
{
- if (state == State.FINISHED)
+ if (state.isClosed())
return;
logger.trace("Closing read {}", this);
+ state.close();
executionController.close();
- state = State.FINISHED;
+ state = State.CLOSED;
}
}
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/ExtendingCompletedRead.java
b/src/java/org/apache/cassandra/service/reads/tracked/ExtendingCompletedRead.java
new file mode 100644
index 0000000000..f5fa4dc25a
--- /dev/null
+++
b/src/java/org/apache/cassandra/service/reads/tracked/ExtendingCompletedRead.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.tracked;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+
+class ExtendingCompletedRead implements PartialTrackedRead.CompletedRead
+{
+ private static final Logger logger =
LoggerFactory.getLogger(ExtendingCompletedRead.class);
+
+ final PartitionRangeReadCommand command;
+ final UnfilteredPartitionIterator iterator;
+ // merged end-result counter
+ final DataLimits.Counter mergedResultCounter;
+
+ private final boolean partitionsFetched;
+ private final boolean initialIteratorExhausted;
+ protected final AbstractBounds<PartitionPosition> followUpBounds;
+
+ public ExtendingCompletedRead(PartitionRangeReadCommand command,
+ UnfilteredPartitionIterator iterator,
+ boolean partitionsFetched,
+ boolean initialIteratorExhausted,
+ AbstractBounds<PartitionPosition>
followUpBounds)
+ {
+ this.command = command;
+ this.iterator = iterator;
+ mergedResultCounter = command.limits().newCounter(command.nowInSec(),
+ true,
+
command.selectsFullPartition(),
+
command.metadata().enforceStrictLiveness());
+ this.partitionsFetched = partitionsFetched;
+ this.initialIteratorExhausted = initialIteratorExhausted;
+ this.followUpBounds = followUpBounds;
+ }
+
+ @Override
+ public TrackedDataResponse response()
+ {
+ PartitionIterator filtered =
UnfilteredPartitionIterators.filter(iterator, command.nowInSec());
+ PartitionIterator counted = Transformation.apply(filtered,
mergedResultCounter);
+ PartitionIterator result = Transformation.apply(counted, new
EmptyPartitionsDiscarder());
+ return TrackedDataResponse.create(result, command.columnFilter());
+ }
+
+ static boolean followUpReadRequired(ReadCommand command,
DataLimits.Counter mergedResultCounter, boolean initialIteratorExhausted,
boolean partitionsFetched)
+ {
+ // never try to request additional partitions from replicas if our
reconciled partitions are already filled to the limit
+ if (mergedResultCounter.isDone())
+ return false;
+
+ // we do not apply short read protection when we have no limits at all
+ if (command.limits().isUnlimited())
+ return false;
+
+ /*
+ * If this is a single partition read command or an (indexed)
partition range read command with
+ * a partition key specified, then we can't and shouldn't try fetch
more partitions.
+ */
+ if (command.isLimitedToOnePartition())
+ return false;
+
+ /*
+ * If the returned result doesn't have enough rows/partitions to
satisfy even the original limit, don't ask for more.
+ *
+ * Can only take the short cut if there is no per partition limit set.
Otherwise it's possible to hit false
+ * positives due to some rows being uncounted for in certain scenarios
(see CASSANDRA-13911).
+ */
+ if (initialIteratorExhausted && command.limits().perPartitionCount()
== DataLimits.NO_LIMIT)
+ return false;
+
+ /*
+ * Either we had an empty iterator as the initial response, or our
moreContents() call got us an empty iterator.
+ * There is no point to ask the replica for more rows - it has no more
in the requested range.
+ */
+ if (!partitionsFetched)
+ return false;
+
+ return true;
+ }
+
+ protected boolean followUpRequired()
+ {
+ return followUpReadRequired(command, mergedResultCounter,
initialIteratorExhausted, partitionsFetched);
+ }
+
+ static int toQuery(ReadCommand command, DataLimits.Counter
mergedResultCounter)
+ {
+ /*
+ * We are going to fetch one partition at a time for thrift and
potentially more for CQL.
+ * The row limit will either be set to the per partition limit - if
the command has no total row limit set, or
+ * the total # of rows remaining - if it has some. If we don't grab
enough rows in some of the partitions,
+ * then future ShortReadRowsProtection.moreContents() calls will fetch
the missing ones.
+ */
+ return command.limits().count() != DataLimits.NO_LIMIT
+ ? command.limits().count() - mergedResultCounter.rowsCounted()
+ : command.limits().perPartitionCount();
+ }
+
+ @Override
+ public Future<TrackedDataResponse> followupRead(TrackedDataResponse
initialResponse, ConsistencyLevel consistencyLevel, long expiresAtNanos)
+ {
+ if (!followUpRequired())
+ return null;
+
+
+ /*
+ * We are going to fetch one partition at a time for thrift and
potentially more for CQL.
+ * The row limit will either be set to the per partition limit - if
the command has no total row limit set, or
+ * the total # of rows remaining - if it has some. If we don't grab
enough rows in some of the partitions,
+ * then future ShortReadRowsProtection.moreContents() calls will fetch
the missing ones.
+ */
+ int toQuery = toQuery(command, mergedResultCounter);
+
+
ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
+ Tracing.trace("Requesting {} extra rows from {} for short read
protection", toQuery, FBUtilities.getBroadcastAddressAndPort());
+ logger.info("Requesting {} extra rows from {} for short read
protection", toQuery, FBUtilities.getBroadcastAddressAndPort());
+
+ return makeFollowupRead(initialResponse, toQuery, consistencyLevel,
expiresAtNanos);
+ }
+
+ protected Future<TrackedDataResponse> makeFollowupRead(TrackedDataResponse
initialResponse, int toQuery, ConsistencyLevel consistencyLevel, long
expiresAtNanos)
+ {
+ TrackedRead.Range followUpRead =
PartialTrackedRangeRead.makeFollowUpRead(command, followUpBounds, toQuery,
consistencyLevel, expiresAtNanos);
+ followUpRead.start(expiresAtNanos);
+ AsyncPromise<TrackedDataResponse> combinedRead = new AsyncPromise<>();
+ followUpRead.future().addCallback((result, failure) -> {
+ if (failure != null)
+ {
+ combinedRead.tryFailure(failure);
+ return;
+ }
+
+ try
+ {
+
combinedRead.trySuccess(TrackedDataResponse.merge(initialResponse, result));
+ }
+ catch (Throwable t)
+ {
+ combinedRead.tryFailure(t);
+ }
+ });
+
+ return combinedRead;
+ }
+
+ @Override
+ public void close()
+ {
+ iterator.close();
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/FilteredFollowupRead.java
b/src/java/org/apache/cassandra/service/reads/tracked/FilteredFollowupRead.java
new file mode 100644
index 0000000000..121ce1a581
--- /dev/null
+++
b/src/java/org/apache/cassandra/service/reads/tracked/FilteredFollowupRead.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.tracked;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import
org.apache.cassandra.service.reads.tracked.PartialTrackedRangeRead.FollowUpReadInfo;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+
+import static
org.apache.cassandra.service.reads.tracked.ExtendingCompletedRead.followUpReadRequired;
+import static
org.apache.cassandra.service.reads.tracked.ExtendingCompletedRead.toQuery;
+import static
org.apache.cassandra.service.reads.tracked.PartialTrackedRangeRead.makeFollowUpRead;
+
+class FilteredFollowupRead extends AsyncPromise<TrackedDataResponse>
+{
+ private final TrackedDataResponse initialResponse;
+ private final int toQuery;
+ private final ConsistencyLevel consistencyLevel;
+ private final long expiresAtNanos;
+ private final SortedMap<DecoratedKey, FollowUpReadInfo> followUpReadInfo;
+ private final PartitionRangeReadCommand command;
+ private final AbstractBounds<PartitionPosition> followUpBounds;
+ private final DecoratedKey finalKey;
+
+ public FilteredFollowupRead(TrackedDataResponse initialResponse,
+ int toQuery,
+ ConsistencyLevel consistencyLevel,
+ long expiresAtNanos,
+ SortedMap<DecoratedKey, FollowUpReadInfo>
followUpReadInfo,
+ PartitionRangeReadCommand command,
+ AbstractBounds<PartitionPosition>
followUpBounds,
+ DecoratedKey finalKey)
+ {
+ this.initialResponse = initialResponse;
+ this.toQuery = toQuery;
+ this.consistencyLevel = consistencyLevel;
+ this.expiresAtNanos = expiresAtNanos;
+ this.followUpReadInfo = followUpReadInfo;
+ this.command = command;
+ this.followUpBounds = followUpBounds;
+ this.finalKey = finalKey;
+ }
+
+ private boolean interleavesWithOriginal(DecoratedKey key)
+ {
+ if (finalKey == null)
+ return false;
+ return key.compareTo(finalKey) < 0;
+ }
+
+ public void start()
+ {
+ ClusterMetadata metadata = ClusterMetadata.current();
+ List<Future<TrackedDataResponse>> futures = new ArrayList<>();
+
+ int remaining = toQuery;
+ PeekingIterator<DecoratedKey> followUpKeys =
Iterators.peekingIterator(followUpReadInfo.keySet().iterator());
+ // query all keys that interleave with the range of keys from the
original range read
+ while (followUpKeys.hasNext() && (remaining > 0 ||
interleavesWithOriginal(followUpKeys.peek())))
+ {
+ DecoratedKey key = followUpKeys.next();
+ FollowUpReadInfo info = followUpReadInfo.get(key);
+ remaining -= info.potentialMatches;
+ SinglePartitionReadCommand cmd =
SinglePartitionReadCommand.fromRangeRead(key, command,
command.limits().forShortReadRetry(toQuery));
+ TrackedRead.Partition read =
TrackedRead.Partition.create(metadata, cmd, consistencyLevel);
+ read.start(expiresAtNanos);
+ futures.add(read.future());
+ }
+
+ SortedMap<DecoratedKey, FollowUpReadInfo> nextKeys =
followUpKeys.hasNext() ? followUpReadInfo.tailMap(followUpKeys.next()) :
Collections.emptySortedMap();
+
+ AtomicReference<PartialTrackedRead> partialRead;
+ if (remaining > 0)
+ {
+ partialRead = new AtomicReference<>();
+ TrackedRead.Range rangeRead = makeFollowUpRead(command,
followUpBounds, remaining, consistencyLevel, expiresAtNanos);
+ rangeRead.startLocal(expiresAtNanos, partialRead::set);
+ futures.add(rangeRead.future());
+ }
+ else
+ {
+ partialRead = null;
+ }
+
+ FutureCombiner.allOf(futures).addCallback((responses, error) -> {
+ if (error != null)
+ {
+ tryFailure(error);
+ return;
+ }
+
+ try
+ {
+ List<TrackedDataResponse> allResponses = new
ArrayList<>(responses);
+ allResponses.add(initialResponse);
+ TrackedDataResponse merged =
TrackedDataResponse.merge(allResponses);
+ DataLimits.Counter mergedResultCounter =
command.limits().newCounter(command.nowInSec(),
+
true,
+
command.selectsFullPartition(),
+
command.metadata().enforceStrictLiveness());
+
+ boolean partitionsFetched;
+ boolean initialIteratorExhausted;
+ TrackedDataResponse response;
+ try (PartitionIterator iterator =
merged.makeIteratorUnlimited(command))
+ {
+ partitionsFetched = iterator.hasNext();
+ response =
TrackedDataResponse.create(mergedResultCounter.applyTo(iterator),
command.columnFilter());
+ initialIteratorExhausted = iterator.hasNext();
+ }
+
+ // although we check for interleaved keys in the initial read,
we always query for them in the follow up, so
+ // we just use normal short read protection checks here
+ if (followUpReadRequired(command, mergedResultCounter,
initialIteratorExhausted, partitionsFetched))
+ {
+ AbstractBounds<PartitionPosition> nextBounds =
this.followUpBounds;
+ if (partialRead != null)
+ {
+ PartialTrackedRangeRead followUpRangeRead =
(PartialTrackedRangeRead) partialRead.get();
+ nextBounds = followUpRangeRead.followUpBounds();
+ }
+ FilteredFollowupRead followUp = new
FilteredFollowupRead(response, toQuery(command, mergedResultCounter),
consistencyLevel, expiresAtNanos, nextKeys, command, nextBounds, null);
+ followUp.start();
+ followUp.addCallback((result, failure) -> {
+ if (failure != null)
+ tryFailure(failure);
+ else
+ trySuccess(result);
+ });
+ }
+ else
+ {
+ trySuccess(response);
+ }
+ }
+ catch (Throwable t)
+ {
+ tryFailure(t);
+ }
+
+ });
+ }
+}
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRangeRead.java
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRangeRead.java
index 4f50442950..1a6c91db4f 100644
---
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRangeRead.java
+++
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRangeRead.java
@@ -18,7 +18,9 @@
package org.apache.cassandra.service.reads.tracked;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -36,15 +38,13 @@ import org.apache.cassandra.db.PartitionRangeReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.partitions.AbstractBTreePartition;
import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.SimpleBTreePartition;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.ExcludingBounds;
@@ -54,39 +54,45 @@ import
org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Future;
-public class PartialTrackedRangeRead extends AbstractPartialTrackedRead
+public abstract class PartialTrackedRangeRead extends
AbstractPartialTrackedRead
{
private static final Logger logger =
LoggerFactory.getLogger(PartialTrackedRangeRead.class);
- private final PartitionRangeReadCommand command;
- private final SortedMap<DecoratedKey, SimpleBTreePartition> data = new
TreeMap<>();
- private final UnfilteredPartitionIterator initialData;
- private final boolean enforceStrictLiveness;
+ protected static class FollowUpReadInfo
+ {
+ int potentialMatches = 0;
+ }
- // short read support
- private DecoratedKey lastPartitionKey; // key of the last observed
partition
- private boolean partitionsFetched; // whether we've seen any new
partitions since iteration start or last moreContents() call
- private boolean initialIteratorExhausted;
- private boolean wasAugmented;
- AbstractBounds<PartitionPosition> followUpBounds;
+ protected final PartitionRangeReadCommand command;
- public PartialTrackedRangeRead(ReadExecutionController
executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long
startTimeNanos, PartitionRangeReadCommand command, UnfilteredPartitionIterator
initialData)
+ private PartialTrackedRangeRead(ReadExecutionController
executionController, ColumnFamilyStore cfs, long startTimeNanos,
PartitionRangeReadCommand command)
{
- super(executionController, searcher, cfs, startTimeNanos);
+ super(executionController, cfs, startTimeNanos);
this.command = command;
- this.initialData = initialData;
- this.enforceStrictLiveness =
command.metadata().enforceStrictLiveness();
}
public static PartialTrackedRangeRead create(ReadExecutionController
executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long
startTimeNanos, PartitionRangeReadCommand command, UnfilteredPartitionIterator
initialData)
{
- PartialTrackedRangeRead read = new
PartialTrackedRangeRead(executionController, searcher, cfs, startTimeNanos,
command, initialData);
+ RowFilter rowFilter = command.rowFilter();
+ PartialTrackedRangeRead read;
+ if (searcher != null)
+ {
+ throw new UnsupportedOperationException("TODO: CASSANDRA-20374");
+ }
+ else if (!rowFilter.isEmpty())
+ {
+ read = new PartialTrackedRangeRead.Filtered(executionController,
cfs, startTimeNanos, command);
+ }
+ else
+ {
+ read = new PartialTrackedRangeRead.Simple(executionController,
cfs, startTimeNanos, command);
+ }
+
try
{
- read.prepare();
+ read.prepare(initialData);
return read;
}
catch (Throwable e)
@@ -102,6 +108,227 @@ public class PartialTrackedRangeRead extends
AbstractPartialTrackedRead
return command;
}
+ protected static class ShortReadSupport
+ {
+ final DecoratedKey lastPartitionKey; // key of the last observed
partition
+ final boolean partitionsFetched; // whether we've seen any new
partitions since iteration start or last moreContents() call
+ final boolean initialIteratorExhausted;
+ final AbstractBounds<PartitionPosition> followUpBounds;
+ boolean wasAugmented;
+
+ ShortReadSupport(Builder builder, boolean initialIteratorExhausted,
AbstractBounds<PartitionPosition> followUpBounds)
+ {
+ this.lastPartitionKey = builder.lastPartitionKey;
+ this.partitionsFetched = builder.partitionsFetched;
+ this.initialIteratorExhausted = initialIteratorExhausted;
+ this.followUpBounds = followUpBounds;
+ this.wasAugmented = false;
+ }
+
+ protected static class Builder
+ {
+ final ReadCommand command;
+ final DataLimits.Counter counter;
+ DecoratedKey lastPartitionKey; // key of the last observed
partition
+ boolean partitionsFetched; // whether we've seen any new
partitions since iteration start or last moreContents() call
+
+ protected Builder(ReadCommand command)
+ {
+ this.command = command;
+ counter = command.limits().newCounter(command.nowInSec(),
+ false,
+
command.selectsFullPartition(),
+
command.metadata().enforceStrictLiveness());
+ }
+
+ ShortReadSupport build()
+ {
+ boolean initialIteratorExhausted =
command.limits().isExhausted(counter);
+ AbstractBounds<PartitionPosition> followUpBounds = null;
+ if (partitionsFetched)
+ {
+ AbstractBounds<PartitionPosition> bounds =
command.dataRange().keyRange();
+ followUpBounds = bounds.inclusiveRight()
+ ? new Range<>(lastPartitionKey,
bounds.right)
+ : new ExcludingBounds<>(lastPartitionKey,
bounds.right);
+
Preconditions.checkState(!followUpBounds.contains(lastPartitionKey));
+ }
+ return new ShortReadSupport(this, initialIteratorExhausted,
followUpBounds);
+ }
+ }
+ }
+
+ private abstract class Materializer extends
Transformation<UnfilteredRowIterator>
+ {
+ final SortedMap<DecoratedKey, SimpleBTreePartition> data = new
TreeMap<>();
+ final ShortReadSupport.Builder shortReadSupport;
+
+ private Materializer(ReadCommand command)
+ {
+ this.shortReadSupport = new ShortReadSupport.Builder(command);
+ }
+
+ abstract UnfilteredPartitionIterator
filter(UnfilteredPartitionIterator iterator);
+
+ abstract RangePrepared createRangePrepared();
+
+ RangePrepared materialize(UnfilteredPartitionIterator inputIterator)
+ {
+ try
+ {
+ UnfilteredPartitionIterator materialized =
Transformation.apply(inputIterator, new Transformation<UnfilteredRowIterator>()
+ {
+ @Override
+ protected UnfilteredRowIterator
applyToPartition(UnfilteredRowIterator partition)
+ {
+ SimpleBTreePartition materialized =
data.computeIfAbsent(partition.partitionKey(), key -> new
SimpleBTreePartition(key, partition.metadata(), UpdateTransaction.NO_OP));
+
materialized.update(PartitionUpdate.fromIterator(partition,
command.columnFilter()));
+ shortReadSupport.lastPartitionKey =
partition.partitionKey();
+ shortReadSupport.partitionsFetched = true;
+ return queryPartition(materialized);
+ }
+ });
+
+ UnfilteredPartitionIterator filtered = filter(materialized);
+
+ try (UnfilteredPartitionIterator iterator =
shortReadSupport.counter.applyTo(filtered))
+ {
+ consume(iterator);
+ }
+
+ return createRangePrepared();
+ }
+ finally
+ {
+ inputIterator.close();
+ }
+ }
+
+ @Override
+ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator
partition)
+ {
+ SimpleBTreePartition materialized =
data.computeIfAbsent(partition.partitionKey(), key -> new
SimpleBTreePartition(key, partition.metadata(), UpdateTransaction.NO_OP));
+ materialized.update(PartitionUpdate.fromIterator(partition,
command.columnFilter()));
+ shortReadSupport.lastPartitionKey = partition.partitionKey();
+ shortReadSupport.partitionsFetched = true;
+ return queryPartition(materialized);
+ }
+ }
+
+ protected abstract class RangePrepared extends Prepared
+ {
+ protected final SortedMap<DecoratedKey, SimpleBTreePartition> data;
+ protected final ShortReadSupport shortReadSupport;
+ protected boolean wasAugmented;
+
+ public RangePrepared(SortedMap<DecoratedKey, SimpleBTreePartition>
data, ShortReadSupport shortReadSupport)
+ {
+ this.data = data;
+ this.shortReadSupport = shortReadSupport;
+ }
+
+ protected boolean canAcceptUpdate(PartitionUpdate update)
+ {
+ return shortReadSupport.initialIteratorExhausted ||
!shortReadSupport.followUpBounds.contains(update.partitionKey());
+ }
+
+ private SimpleBTreePartition augmentResponseInternal(PartitionUpdate
update)
+ {
+ SimpleBTreePartition partition =
data.computeIfAbsent(update.partitionKey(), key -> new
SimpleBTreePartition(key, update.metadata(), UpdateTransaction.NO_OP));
+ partition.update(update);
+ return partition;
+ }
+
+ @Override
+ public State augment(PartitionUpdate update)
+ {
+ // if the input iterator reached the row limit, then we can't
apply any augmenting mutations that are past
+ // the last materialized key. Since we wouldn't have materialized
the local data for that key, applying an
+ // update would cause us to return incomplete data for it.
+ if (canAcceptUpdate(update))
+ {
+ logger.trace("Augmented partition {} for read {}",
update.partitionKey(), PartialTrackedRangeRead.this);
+ augmentResponseInternal(update);
+ }
+ else
+ {
+ logger.trace("Ignoring unacceptable update from key {} on read
{}", update.partitionKey(), PartialTrackedRangeRead.this);
+ }
+ wasAugmented = true;
+ return this;
+ }
+ }
+
+ protected abstract class RangeCompleted extends Completed
+ {
+ protected final SortedMap<DecoratedKey, SimpleBTreePartition> data;
+ protected final ShortReadSupport shortReadSupport;
+ protected final boolean wasAugmented;
+
+ public RangeCompleted(SortedMap<DecoratedKey, SimpleBTreePartition>
data, ShortReadSupport shortReadSupport, boolean wasAugmented)
+ {
+ this.data = data;
+ this.shortReadSupport = shortReadSupport;
+ this.wasAugmented = wasAugmented;
+ }
+
+ @Override
+ protected UnfilteredPartitionIterator iterator()
+ {
+ Iterator<SimpleBTreePartition> iterator = data.values().iterator();
+ return new AbstractUnfilteredPartitionIterator()
+ {
+ @Override
+ public TableMetadata metadata()
+ {
+ return command.metadata();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public UnfilteredRowIterator next()
+ {
+ return queryPartition(iterator.next());
+ }
+ };
+ }
+
+ protected abstract CompletedRead
extendRead(UnfilteredPartitionIterator iterator);
+
+ @Override
+ protected CompletedRead createResult(UnfilteredPartitionIterator
iterator)
+ {
+ if (wasAugmented)
+ return extendRead(iterator);
+ return CompletedRead.simple(iterator, command);
+ }
+
+ AbstractBounds<PartitionPosition> followUpBounds()
+ {
+ return shortReadSupport.followUpBounds;
+ }
+ }
+
+ abstract Materializer createMaterializer();
+
+ @Override
+ protected Prepared prepareInternal(UnfilteredPartitionIterator initialData)
+ {
+ // memtable contents are frozen at read completion time, when the
iterator is evaluated, not at the beginning
+ // of the read, when references to memtables and sstables are
collected. Because of this, replica coordinated
+ // reads can cause read monotonicity to be broken by returning data
that hasn't been replicated to at least
+ // CL other nodes via reconciliation. To prevent this, the contents of
the initial iterator are materialized
+ // onto heap at partition granularity until the limits of the read are
reached.
+
+ Materializer materializer = createMaterializer();
+ return materializer.materialize(initialData);
+ }
+
UnfilteredRowIterator queryPartition(AbstractBTreePartition partition)
{
return partition.unfilteredIterator(command.columnFilter(),
@@ -121,227 +348,264 @@ public class PartialTrackedRangeRead extends
AbstractPartialTrackedRead
}
}
- @Override
- void freezeInitialData()
+ public AbstractBounds<PartitionPosition> followUpBounds()
{
- // memtable contents are frozen at read completion time, when the
iterator is evaluated, not at the beginning
- // of the read, when references to memtables and sstables are
collected. Because of this, replica coordinated
- // reads can cause read monotonicity to be broken by returning data
that hasn't been replicated to at least
- // CL other nodes via reconciliation. To prevent this, the contents of
the initial iterator are materialized
- // onto heap at partition granularity until the limits of the read are
reached.
+ RangeCompleted completed = (RangeCompleted) state().asCompleted();
+ return completed.followUpBounds();
+ }
+
+ protected static TrackedRead.Range
makeFollowUpRead(PartitionRangeReadCommand command,
AbstractBounds<PartitionPosition> followUpBounds, int toQuery, ConsistencyLevel
consistencyLevel, long expiresAtNanos)
+ {
+ DataLimits newLimits = command.limits().forShortReadRetry(toQuery);
+
+ DataRange newDataRange =
command.dataRange().forSubRange(followUpBounds);
+
+ Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+ PartitionRangeReadCommand followUpCmd =
command.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
+ ReplicaPlan.ForRangeRead replicaPlan =
ReplicaPlans.forRangeRead(keyspace,
+
followUpCmd.indexQueryPlan(),
+
consistencyLevel,
+
followUpCmd.dataRange().keyRange(),
+ 1);
- UnfilteredPartitionIterator materializer = new
AbstractUnfilteredPartitionIterator()
+ TrackedRead.Range read = TrackedRead.Range.create(followUpCmd,
replicaPlan);
+ logger.trace("Short read detected, starting followup read {}", read);
+ return read;
+ }
+
+ static class Simple extends PartialTrackedRangeRead
+ {
+ private class SimplePrepared extends RangePrepared
{
- @Override
- public TableMetadata metadata()
+ public SimplePrepared(SortedMap<DecoratedKey,
SimpleBTreePartition> data, ShortReadSupport shortReadSupport)
{
- return initialData.metadata();
+ super(data, shortReadSupport);
}
@Override
- public boolean hasNext()
+ Completed complete()
{
- return initialData.hasNext();
+ return new SimpleCompleted(data, shortReadSupport,
wasAugmented);
}
+ }
- @Override
- public UnfilteredRowIterator next()
+ protected class SimpleCompleted extends RangeCompleted
+ {
+ public SimpleCompleted(SortedMap<DecoratedKey,
SimpleBTreePartition> data, ShortReadSupport shortReadSupport, boolean
wasAugmented)
{
- try (UnfilteredRowIterator rowIterator = initialData.next())
- {
- SimpleBTreePartition partition =
augmentResponseInternal(PartitionUpdate.fromIterator(rowIterator,
command.columnFilter()));
- lastPartitionKey = partition.partitionKey();
- partitionsFetched = true;
- return queryPartition(partition);
- }
+ super(data, shortReadSupport, wasAugmented);
}
@Override
- public void close()
+ protected CompletedRead extendRead(UnfilteredPartitionIterator
iterator)
{
- super.close();
- initialData.close();
+ return new ExtendingCompletedRead(command, iterator,
shortReadSupport.partitionsFetched, shortReadSupport.initialIteratorExhausted,
shortReadSupport.followUpBounds);
}
- };
-
- // unmerged per-source counter
- final DataLimits.Counter singleResultCounter =
command.limits().newCounter(command.nowInSec(),
-
false,
-
command.selectsFullPartition(),
-
enforceStrictLiveness);
- try (UnfilteredPartitionIterator iterator =
singleResultCounter.applyTo(materializer))
+ }
+
+ public Simple(ReadExecutionController executionController,
ColumnFamilyStore cfs, long startTimeNanos, PartitionRangeReadCommand command)
+ {
+ super(executionController, cfs, startTimeNanos, command);
+ }
+
+ @Override
+ Materializer createMaterializer()
{
- consume(iterator);
+ return new Materializer(command)
+ {
+ @Override
+ UnfilteredPartitionIterator filter(UnfilteredPartitionIterator
iterator)
+ {
+ Preconditions.checkState(command().rowFilter().isEmpty());
+ return iterator;
+ }
+
+ @Override
+ RangePrepared createRangePrepared()
+ {
+ return new SimplePrepared(data, shortReadSupport.build());
+ }
+ };
}
- initialIteratorExhausted =
command.limits().isExhausted(singleResultCounter);
- if (partitionsFetched)
+
+ @Override
+ public Index.Searcher searcher()
{
- AbstractBounds<PartitionPosition> bounds =
command.dataRange().keyRange();
- followUpBounds = bounds.inclusiveRight()
- ? new Range<>(lastPartitionKey, bounds.right)
- : new ExcludingBounds<>(lastPartitionKey,
bounds.right);
-
Preconditions.checkState(!followUpBounds.contains(lastPartitionKey));
+ return null;
}
- wasAugmented = false;
}
- @Override
- UnfilteredPartitionIterator initialData()
+
+ /**
+ * Since ALLOW FILTERING reads can cover a lot of partitions without
returning much data, we don't want to eagerly
+ * materialize partitions onto the heap and keep them there. So this
filters out non-matching partitions from the
+ * freezeInitialData phase. However, if reconciliation receives a mutation
that applies to a previously discarded
+ * partition AND the contents of that mutation matches the row filter, we
also need to retry the read against that
+ * partition so we don't return incomplete data. This class handles both
jobs
+ */
+ static class Filtered extends PartialTrackedRangeRead
{
- Iterator<SimpleBTreePartition> iterator = data.values().iterator();
- return new AbstractUnfilteredPartitionIterator()
+
+ protected class FilteredPrepared extends RangePrepared
{
- @Override
- public TableMetadata metadata()
+ private final Set<DecoratedKey> filteredKeys;
+ private final SortedMap<DecoratedKey, FollowUpReadInfo>
followUpReadInfo = new TreeMap<>();
+ private final RowFilter.RowFilterTransformation filter;
+ public FilteredPrepared(SortedMap<DecoratedKey,
SimpleBTreePartition> data, ShortReadSupport shortReadSupport,
Set<DecoratedKey> filteredKeys, RowFilter.RowFilterTransformation filter)
{
- return command.metadata();
+ super(data, shortReadSupport);
+ this.filteredKeys = filteredKeys;
+ this.filter = filter;
}
@Override
- public boolean hasNext()
+ protected boolean canAcceptUpdate(PartitionUpdate update)
{
- return iterator.hasNext();
+ DecoratedKey key = update.partitionKey();
+ if (filteredKeys.contains(key))
+ {
+ int matches = filter.potentialMatches(update);
+ if (matches > 0)
+ {
+ FollowUpReadInfo info =
followUpReadInfo.computeIfAbsent(key, k -> new FollowUpReadInfo());
+ info.potentialMatches += matches;
+ }
+ logger.trace("Not applying update for previously filtered
partition: {}", update.partitionKey());
+ return false;
+ }
+ return super.canAcceptUpdate(update);
}
@Override
- public UnfilteredRowIterator next()
+ Completed complete()
{
- return queryPartition(iterator.next());
+ return new FilteredCompleted(data, shortReadSupport,
wasAugmented, followUpReadInfo);
}
- };
- }
-
- @Override
- UnfilteredPartitionIterator augmentedData()
- {
- return null;
- }
-
- private SimpleBTreePartition augmentResponseInternal(PartitionUpdate
update)
- {
- SimpleBTreePartition partition =
data.computeIfAbsent(update.partitionKey(), key -> new
SimpleBTreePartition(key, update.metadata(), UpdateTransaction.NO_OP));
- partition.update(update);
- return partition;
- }
-
- @Override
- void augmentResponse(PartitionUpdate update)
- {
- // if the input iterator reached the row limit, then we can't apply
any augmenting mutations that are past
- // the last materialized key. Since we wouldn't have materialized the
local data for that key, applying an
- // update would cause us to return incomplete data for it.
- if (initialIteratorExhausted ||
!followUpBounds.contains(update.partitionKey()))
- augmentResponseInternal(update);
- wasAugmented = true;
- }
+ }
- private class ExtendingCompletedRead implements CompletedRead
- {
+ protected class FilteredMaterializer extends Materializer
+ {
+ private final Set<DecoratedKey> filteredKeys = new HashSet<>();
+ private final RowFilter.RowFilterTransformation filter;
+ public FilteredMaterializer(ReadCommand command)
+ {
+ super(command);
+ filter = command.rowFilter().filter(command().metadata(),
command().nowInSec());
+ }
- final UnfilteredPartitionIterator iterator;
- // merged end-result counter
- final DataLimits.Counter mergedResultCounter =
command.limits().newCounter(command.nowInSec(),
-
true,
-
command.selectsFullPartition(),
-
enforceStrictLiveness);
+ @Override
+ UnfilteredPartitionIterator filter(UnfilteredPartitionIterator
iterator)
+ {
+ return Transformation.apply(iterator, new Transformation<>()
+ {
+ @Override
+ protected UnfilteredRowIterator
applyToPartition(UnfilteredRowIterator partition)
+ {
+ if (Transformation.apply(partition, filter).isEmpty())
+ {
+ DecoratedKey key = partition.partitionKey();
+ data.remove(key);
+ filteredKeys.add(key);
+ partition.close();
+ return null;
+ }
+ return partition;
+ }
+ });
+ }
- public ExtendingCompletedRead(UnfilteredPartitionIterator iterator)
- {
- this.iterator = iterator;
+ @Override
+ RangePrepared createRangePrepared()
+ {
+ return new FilteredPrepared(data, shortReadSupport.build(),
filteredKeys, filter);
+ }
}
- @Override
- public PartitionIterator iterator()
+ static class FilteredCompletedRead extends ExtendingCompletedRead
{
- PartitionIterator filtered =
UnfilteredPartitionIterators.filter(iterator, command.nowInSec());
- PartitionIterator counted = Transformation.apply(filtered,
mergedResultCounter);
- return Transformation.apply(counted, new
EmptyPartitionsDiscarder());
- }
+ private final DecoratedKey lastMatchingKey;
+ private final SortedMap<DecoratedKey, FollowUpReadInfo>
followUpReadInfo;
+ public FilteredCompletedRead(PartitionRangeReadCommand command,
UnfilteredPartitionIterator iterator, ShortReadSupport shortReadSupport,
DecoratedKey lastMatchingKey, SortedMap<DecoratedKey, FollowUpReadInfo>
followUpReadInfo)
+ {
+ super(command, iterator, shortReadSupport.partitionsFetched,
shortReadSupport.initialIteratorExhausted, shortReadSupport.followUpBounds);
+ this.lastMatchingKey = lastMatchingKey;
+ this.followUpReadInfo = followUpReadInfo;
+ }
- @Override
- public TrackedRead<?, ?> followupRead(ConsistencyLevel
consistencyLevel, long expiresAtNanos)
- {
- // never try to request additional partitions from replicas if our
reconciled partitions are already filled to the limit
- if (mergedResultCounter.isDone())
- return null;
+ /**
+ * Even if we reached the limit during materialization, if there
are keys ahead of the first materialized key
+ * or interleaved with them, then we need to read them
+ * @return
+ */
+ private boolean hasInterleavedFollowupKeys()
+ {
+ if (followUpReadInfo.isEmpty())
+ return false;
- // we do not apply short read protection when we have no limits at
all
- if (command.limits().isUnlimited())
- return null;
+ if (lastMatchingKey == null) // null means there was no data
and therefore no interleaving
+ return true;
- /*
- * If this is a single partition read command or an (indexed)
partition range read command with
- * a partition key specified, then we can't and shouldn't try
fetch more partitions.
- */
- if (command.isLimitedToOnePartition())
- return null;
-
- /*
- * If the returned result doesn't have enough rows/partitions to
satisfy even the original limit, don't ask for more.
- *
- * Can only take the short cut if there is no per partition limit
set. Otherwise it's possible to hit false
- * positives due to some rows being uncounted for in certain
scenarios (see CASSANDRA-13911).
- */
- if (initialIteratorExhausted &&
command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
- return null;
+ return followUpReadInfo.firstKey().compareTo(lastMatchingKey)
< 0;
+ }
- /*
- * Either we had an empty iterator as the initial response, or our
moreContents() call got us an empty iterator.
- * There is no point to ask the replica for more rows - it has no
more in the requested range.
- */
- if (!partitionsFetched)
- return null;
- partitionsFetched = false;
-
- /*
- * We are going to fetch one partition at a time for thrift and
potentially more for CQL.
- * The row limit will either be set to the per partition limit -
if the command has no total row limit set, or
- * the total # of rows remaining - if it has some. If we don't
grab enough rows in some of the partitions,
- * then future ShortReadRowsProtection.moreContents() calls will
fetch the missing ones.
- */
- int toQuery = command.limits().count() != DataLimits.NO_LIMIT
- ? command.limits().count() -
mergedResultCounter.rowsCounted()
- : command.limits().perPartitionCount();
+ @Override
+ protected boolean followUpRequired()
+ {
+ return hasInterleavedFollowupKeys() ||
super.followUpRequired();
+ }
-
ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
- Tracing.trace("Requesting {} extra rows from {} for short read
protection", toQuery, FBUtilities.getBroadcastAddressAndPort());
- logger.info("Requesting {} extra rows from {} for short read
protection", toQuery, FBUtilities.getBroadcastAddressAndPort());
+ @Override
+ protected Future<TrackedDataResponse>
makeFollowupRead(TrackedDataResponse initialResponse, int toQuery,
ConsistencyLevel consistencyLevel, long expiresAtNanos)
+ {
+ if (followUpReadInfo.isEmpty())
+ return super.makeFollowupRead(initialResponse, toQuery,
consistencyLevel, expiresAtNanos);
+
+ FilteredFollowupRead followupRead = new
FilteredFollowupRead(initialResponse,
+
toQuery,
+
consistencyLevel,
+
expiresAtNanos,
+
followUpReadInfo,
+
command,
+
followUpBounds,
+
lastMatchingKey);
+
+ followupRead.start();
+ return followupRead;
+ }
- return makeFollowupRead(toQuery, consistencyLevel, expiresAtNanos);
}
- private TrackedRead<?, ?> makeFollowupRead(int toQuery,
ConsistencyLevel consistencyLevel, long expiresAtNanos)
+ private class FilteredCompleted extends RangeCompleted
{
- DataLimits newLimits = command.limits().forShortReadRetry(toQuery);
-
- DataRange newDataRange =
command.dataRange().forSubRange(followUpBounds);
+ private final SortedMap<DecoratedKey, FollowUpReadInfo>
followUpReadInfo;
+ public FilteredCompleted(SortedMap<DecoratedKey,
SimpleBTreePartition> data, ShortReadSupport shortReadSupport, boolean
wasAugmented, SortedMap<DecoratedKey, FollowUpReadInfo> followUpReadInfo)
+ {
+ super(data, shortReadSupport, wasAugmented);
+ this.followUpReadInfo = followUpReadInfo;
+ }
- Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
- PartitionRangeReadCommand followUpCmd =
command.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
- ReplicaPlan.ForRangeRead replicaPlan =
ReplicaPlans.forRangeRead(keyspace,
-
followUpCmd.indexQueryPlan(),
-
consistencyLevel,
-
followUpCmd.dataRange().keyRange(),
-
1);
+ @Override
+ protected CompletedRead extendRead(UnfilteredPartitionIterator
iterator)
+ {
+ return new FilteredCompletedRead(command, iterator,
shortReadSupport, data.isEmpty() ? data.lastKey() : null, followUpReadInfo);
+ }
+ }
- TrackedRead.Range read = TrackedRead.Range.create(followUpCmd,
replicaPlan);
- logger.trace("Short read detected, starting followup read {}",
read);
- read.start(expiresAtNanos);
- return read;
+ public Filtered(ReadExecutionController executionController,
ColumnFamilyStore cfs, long startTimeNanos, PartitionRangeReadCommand command)
+ {
+ super(executionController, cfs, startTimeNanos, command);
}
@Override
- public void close()
+ Materializer createMaterializer()
{
- iterator.close();
+ return new FilteredMaterializer(command);
}
- }
- @Override
- CompletedRead createResult(UnfilteredPartitionIterator iterator)
- {
- if (wasAugmented)
- return new ExtendingCompletedRead(iterator);
- return CompletedRead.simple(iterator, command().nowInSec());
+ @Override
+ public Index.Searcher searcher()
+ {
+ return null;
+ }
}
}
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java
index daf222239c..99e509ee7f 100644
---
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java
+++
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java
@@ -25,33 +25,46 @@ import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.index.Index;
+import org.apache.cassandra.utils.concurrent.Future;
public interface PartialTrackedRead
{
interface CompletedRead extends AutoCloseable
{
- PartitionIterator iterator();
- TrackedRead<?, ?> followupRead(ConsistencyLevel consistencyLevel, long
expiresAtNanos);
+ TrackedDataResponse response(); // must be called from the read stage
+ Future<TrackedDataResponse> followupRead(TrackedDataResponse
initialResponse, ConsistencyLevel consistencyLevel, long expiresAtNanos);
@Override
void close();
- static CompletedRead simple(UnfilteredPartitionIterator partition,
long nowInSec)
+ static TrackedDataResponse createResponse(UnfilteredPartitionIterator
partition, ReadCommand command)
+ {
+ PartitionIterator iterator =
UnfilteredPartitionIterators.filter(partition, command.nowInSec());
+ DataLimits.Counter counter =
command.limits().newCounter(command.nowInSec(),
+ false,
+
command.selectsFullPartition(),
+
command.metadata().enforceStrictLiveness()).onlyCount();
+ return TrackedDataResponse.create(counter.applyTo(iterator),
+ command.columnFilter());
+ }
+
+ static CompletedRead simple(UnfilteredPartitionIterator partition,
ReadCommand command)
{
return new CompletedRead()
{
@Override
- public PartitionIterator iterator()
+ public TrackedDataResponse response()
{
- return UnfilteredPartitionIterators.filter(partition,
nowInSec);
+ return createResponse(partition, command);
}
@Override
- public TrackedRead<?, ?> followupRead(ConsistencyLevel
consistencyLevel, long expiresAtNanos)
+ public Future<TrackedDataResponse>
followupRead(TrackedDataResponse initialRead, ConsistencyLevel
consistencyLevel, long expiresAtNanos)
{
return null;
}
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java
index 6aa64a2985..e15dba8a5c 100644
---
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java
+++
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.service.reads.tracked;
+import java.util.List;
+
import com.google.common.base.Preconditions;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -29,29 +31,31 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.SimpleBTreePartition;
import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.index.Index;
import org.apache.cassandra.index.transactions.UpdateTransaction;
+import static
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
+
public class PartialTrackedSinglePartitionRead extends
AbstractPartialTrackedRead
{
+ private final Index.Searcher searcher;
private final SinglePartitionReadCommand command;
- private final UnfilteredPartitionIterator initialData;
- private SimpleBTreePartition augmentedData;
- public PartialTrackedSinglePartitionRead(ReadExecutionController
executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long
startTimeNanos, SinglePartitionReadCommand command, UnfilteredPartitionIterator
initialData)
+ public PartialTrackedSinglePartitionRead(ReadExecutionController
executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long
startTimeNanos, SinglePartitionReadCommand command)
{
- super(executionController, searcher, cfs, startTimeNanos);
+ super(executionController, cfs, startTimeNanos);
+ this.searcher = searcher;
this.command = command;
- this.initialData = initialData;
}
public static PartialTrackedSinglePartitionRead
create(ReadExecutionController executionController, Index.Searcher searcher,
ColumnFamilyStore cfs, long startTimeNanos, SinglePartitionReadCommand command,
UnfilteredPartitionIterator initialData)
{
- PartialTrackedSinglePartitionRead read = new
PartialTrackedSinglePartitionRead(executionController, searcher, cfs,
startTimeNanos, command, initialData);
+ PartialTrackedSinglePartitionRead read = new
PartialTrackedSinglePartitionRead(executionController, searcher, cfs,
startTimeNanos, command);
try
{
- read.prepare();
+ read.prepare(initialData);
return read;
}
catch (Throwable e)
@@ -61,48 +65,88 @@ public class PartialTrackedSinglePartitionRead extends
AbstractPartialTrackedRea
}
}
- @Override
- void freezeInitialData()
+ private class SinglePartitionPrepared extends Prepared
{
- // the iterators from queryStorage grabs sstable references and a
- // snapshot of the memtable partition so we don't need to do anything
here
- }
+ private final UnfilteredPartitionIterator initialData;
+ private SimpleBTreePartition augmentedData;
- @Override
- public ReadCommand command()
- {
- return command;
+ private SinglePartitionPrepared(UnfilteredPartitionIterator
initialData)
+ {
+ this.initialData = initialData;
+ }
+
+ @Override
+ public State augment(PartitionUpdate update)
+ {
+
Preconditions.checkArgument(update.partitionKey().equals(command.partitionKey()));
+ if (augmentedData == null)
+ augmentedData = new
SimpleBTreePartition(command.partitionKey(), command.metadata(),
UpdateTransaction.NO_OP);
+
+ augmentedData.update(update);
+ return this;
+ }
+
+ @Override
+ Completed complete()
+ {
+ return new SinglePartitionCompleted(initialData, augmentedData);
+ }
}
- @Override
- UnfilteredPartitionIterator initialData()
+ private class SinglePartitionCompleted extends Completed
{
- return initialData;
+ private final UnfilteredPartitionIterator initialData;
+ private final SimpleBTreePartition augmentedData;
+
+ public SinglePartitionCompleted(UnfilteredPartitionIterator
initialData, SimpleBTreePartition augmentedData)
+ {
+ this.initialData = initialData;
+ this.augmentedData = augmentedData;
+ }
+
+ private UnfilteredPartitionIterator augmentedIterator()
+ {
+ if (augmentedData == null)
+ return null;
+ Slices slices =
command.clusteringIndexFilter().getSlices(command.metadata());
+ UnfilteredRowIterator augmentedPartition =
augmentedData.unfilteredIterator(command.columnFilter(), slices,
command.clusteringIndexFilter().isReversed());
+ return new
SingletonUnfilteredPartitionIterator(augmentedPartition);
+ }
+
+ @Override
+ protected UnfilteredPartitionIterator iterator()
+ {
+ UnfilteredPartitionIterator augmentedIterator =
augmentedIterator();
+ if (augmentedIterator == null)
+ return initialData;
+
+ return UnfilteredPartitionIterators.merge(List.of(initialData,
augmentedIterator), NOOP);
+ }
+
+ @Override
+ protected CompletedRead createResult(UnfilteredPartitionIterator
iterator)
+ {
+ return CompletedRead.simple(iterator, command);
+ }
}
@Override
- UnfilteredPartitionIterator augmentedData()
+ protected Prepared prepareInternal(UnfilteredPartitionIterator initialData)
{
- if (augmentedData == null)
- return null;
- Slices slices =
command.clusteringIndexFilter().getSlices(command.metadata());
- UnfilteredRowIterator augmented =
augmentedData.unfilteredIterator(command.columnFilter(), slices,
command.clusteringIndexFilter().isReversed());
- return new SingletonUnfilteredPartitionIterator(augmented);
+ return new SinglePartitionPrepared(initialData);
}
@Override
- void augmentResponse(PartitionUpdate update)
+ public Index.Searcher searcher()
{
-
Preconditions.checkArgument(update.partitionKey().equals(command.partitionKey()));
- if (augmentedData == null)
- augmentedData = new SimpleBTreePartition(command.partitionKey(),
command.metadata(), UpdateTransaction.NO_OP);
-
- augmentedData.update(update);
+ return searcher;
}
+ // TODO: delete (almost?) ever
+
@Override
- CompletedRead createResult(UnfilteredPartitionIterator iterator)
+ public ReadCommand command()
{
- return CompletedRead.simple(iterator, command().nowInSec());
+ return command;
}
}
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java
index b55095bf02..01fad19fb5 100644
---
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java
+++
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.reads.tracked;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.partitions.PartitionIterator;
import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.io.IVersionedSerializer;
@@ -33,18 +34,63 @@ import org.apache.cassandra.utils.ByteBufferUtil;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
public class TrackedDataResponse
{
private final int serializationVersion;
- private final ByteBuffer data;
+ private final List<ByteBuffer> data;
public TrackedDataResponse(int serializationVersion, ByteBuffer data)
{
+ this(serializationVersion, Collections.singletonList(data));
+ }
+
+ private TrackedDataResponse(int serializationVersion, List<ByteBuffer>
data)
+ {
+ Preconditions.checkArgument(!data.isEmpty());
this.serializationVersion = serializationVersion;
this.data = data;
}
+ public TrackedDataResponse merge(TrackedDataResponse that)
+ {
+ return merge(this, that);
+ }
+
+ public static TrackedDataResponse merge(TrackedDataResponse l,
TrackedDataResponse r)
+ {
+ Preconditions.checkArgument(l.serializationVersion ==
r.serializationVersion);
+ List<ByteBuffer> newData = new ArrayList<>(l.data.size() +
r.data.size());
+ newData.addAll(l.data);
+ newData.addAll(r.data);
+ return new TrackedDataResponse(l.serializationVersion, newData);
+ }
+
+ public static TrackedDataResponse merge(List<TrackedDataResponse>
responses)
+ {
+ Preconditions.checkArgument(!responses.isEmpty());
+
+ int version = responses.get(0).serializationVersion;
+ int size = responses.get(0).data.size();
+
+ for (int i=1,mi=responses.size(); i<mi; i++)
+ {
+ Preconditions.checkState(responses.get(i).serializationVersion ==
version);
+ size += responses.get(i).data.size();
+ }
+
+ List<ByteBuffer> newData = new ArrayList<>(size);
+ for (int i=0,mi=responses.size(); i<mi; i++)
+ newData.addAll(responses.get(i).data);
+
+ return new TrackedDataResponse(version, newData);
+ }
+
public static TrackedDataResponse create(PartitionIterator iter,
ColumnFilter selection)
{
try (DataOutputBuffer buffer = new DataOutputBuffer())
@@ -59,7 +105,7 @@ public class TrackedDataResponse
}
}
- public PartitionIterator makeIterator(ReadCommand command)
+ private static PartitionIterator makeIterator(int serializationVersion,
ByteBuffer data, ReadCommand command)
{
try (DataInputBuffer in = new DataInputBuffer(data, true))
{
@@ -72,28 +118,55 @@ public class TrackedDataResponse
}
}
- public static final IVersionedSerializer<TrackedDataResponse> serializer =
new IVersionedSerializer<TrackedDataResponse>()
+ public PartitionIterator makeIteratorUnlimited(ReadCommand command)
+ {
+ if (data.size() == 1)
+ return makeIterator(serializationVersion, data.get(0), command);
+
+ List<PartitionIterator> iterators = new ArrayList<>(data.size());
+ for (ByteBuffer buffer : data)
+ iterators.add(makeIterator(serializationVersion, buffer, command));
+ return PartitionIterators.mergeNonOverlapping(iterators);
+ }
+
+ public PartitionIterator makeIterator(ReadCommand command)
+ {
+ DataLimits.Counter counter =
command.limits().newCounter(command.nowInSec(),
+ true,
+
command.selectsFullPartition(),
+
command.metadata().enforceStrictLiveness());
+ return counter.applyTo(makeIteratorUnlimited(command));
+ }
+
+ public static final IVersionedSerializer<TrackedDataResponse> serializer =
new IVersionedSerializer<>()
{
@Override
public void serialize(TrackedDataResponse response, DataOutputPlus
out, int version) throws IOException
{
out.writeInt(response.serializationVersion);
- ByteBufferUtil.writeWithVIntLength(response.data, out);
+ out.writeInt(response.data.size());
+ for (ByteBuffer buffer : response.data)
+ ByteBufferUtil.writeWithVIntLength(buffer, out);
}
@Override
public TrackedDataResponse deserialize(DataInputPlus in, int version)
throws IOException
{
int serializationVersion = in.readInt();
- ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
+ int size = in.readInt();
+ List<ByteBuffer> data = new ArrayList<>(size);
+ for (int i = 0; i < size; i++)
+ data.add(ByteBufferUtil.readWithVIntLength(in));
return new TrackedDataResponse(serializationVersion, data);
}
@Override
public long serializedSize(TrackedDataResponse response, int version)
{
- return TypeSizes.sizeof(response.serializationVersion)
- +
ByteBufferUtil.serializedSizeWithVIntLength(response.data);
+ long size = TypeSizes.sizeof(response.serializationVersion) +
TypeSizes.sizeof(response.data.size());
+ for (ByteBuffer buffer : response.data)
+ size += ByteBufferUtil.serializedSizeWithVIntLength(buffer);
+ return size;
}
};
}
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReadCoordinator.java
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReadCoordinator.java
index 6732adca4b..c62531240a 100644
---
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReadCoordinator.java
+++
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReadCoordinator.java
@@ -27,8 +27,6 @@ import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.metrics.ReadRepairMetrics;
@@ -45,6 +43,8 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.AbstractFuture;
import org.apache.cassandra.utils.concurrent.Accumulator;
import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,6 +52,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
+import java.util.function.Consumer;
public class TrackedLocalReadCoordinator
{
@@ -512,7 +513,7 @@ public class TrackedLocalReadCoordinator
});
}
- public void startLocalRead(TrackedRead.Id readId, ReadCommand command,
ReplicaPlan.AbstractForRead<?, ?> replicaPlan, int[] summaryNodes, long
expiresAtNanos)
+ public void startLocalRead(TrackedRead.Id readId, ReadCommand command,
ReplicaPlan.AbstractForRead<?, ?> replicaPlan, int[] summaryNodes, long
expiresAtNanos, Consumer<PartialTrackedRead> partialReadConsumer)
{
synchronized (this)
{
@@ -528,6 +529,8 @@ public class TrackedLocalReadCoordinator
try
{
read = command.beginTrackedRead(controller);
+ if (partialReadConsumer != null)
+ partialReadConsumer.accept(read);
// Create another summary once initial data has been read fully.
We do this to catch
// any mutations that may have arrived during initial read
execution.
secondarySummary = command.createMutationSummary(true);
@@ -555,20 +558,17 @@ public class TrackedLocalReadCoordinator
{
try (PartialTrackedRead.CompletedRead completedRead = read.complete())
{
- TrackedDataResponse response =
TrackedDataResponse.create(completedRead.iterator(), selection);
- TrackedRead<?, ?> followUp =
completedRead.followupRead(consistencyLevel, expiresAtNanos);
+ TrackedDataResponse response = completedRead.response();
+ Future<TrackedDataResponse> followUp =
completedRead.followupRead(response, consistencyLevel, expiresAtNanos);
if (followUp != null)
{
- ReadCommand command = read.command();
- followUp.future().addCallback((iterator, error) -> {
+ followUp.addCallback((newResponse, error) -> {
if (error != null)
{
promise.tryFailure(error);
return;
}
- PartitionIterator previous =
response.makeIterator(command);
- TrackedDataResponse newResponse =
TrackedDataResponse.create(PartitionIterators.concat(List.of(previous,
iterator)), selection);
promise.trySuccess(newResponse);
});
}
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
index 5c2417e6b5..d356ce2efa 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -89,7 +90,7 @@ public class TrackedLocalReads implements Shutdownable
getOrCreate(summary.readId()).receiveSummary(from, summary.summary());
}
- public TrackedLocalReadCoordinator beginRead(TrackedRead.Id readId,
ClusterMetadata metadata, ReadCommand command, ConsistencyLevel
consistencyLevel, int[] summaryNodes, long expiresAtNanos)
+ public TrackedLocalReadCoordinator beginRead(TrackedRead.Id readId,
ClusterMetadata metadata, ReadCommand command, ConsistencyLevel
consistencyLevel, int[] summaryNodes, long expiresAtNanos,
Consumer<PartialTrackedRead> partialReadConsumer)
{
Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
ColumnFamilyStore cfs =
keyspace.getColumnFamilyStore(command.metadata().id);
@@ -116,7 +117,7 @@ public class TrackedLocalReads implements Shutdownable
// TODO: confirm all summaryNodes are present in the replica plan
TrackedLocalReadCoordinator coordinator = getOrCreate(readId);
- coordinator.startLocalRead(readId, command, replicaPlan, summaryNodes,
expiresAtNanos);
+ coordinator.startLocalRead(readId, command, replicaPlan, summaryNodes,
expiresAtNanos, partialReadConsumer);
return coordinator;
}
diff --git
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
index 7356ae785a..a4c4b61230 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
@@ -52,6 +52,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -132,7 +133,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P
extends ReplicaPlan.
}
}
- private final AsyncPromise<PartitionIterator> future = new
AsyncPromise<>();
+ private final AsyncPromise<TrackedDataResponse> future = new
AsyncPromise<>();
private final Id readId = Id.nextId();
private final ReadCommand command;
@@ -171,6 +172,11 @@ public abstract class TrackedRead<E extends Endpoints<E>,
P extends ReplicaPlan.
protected abstract Verb verb();
+ public boolean intersects(DecoratedKey key)
+ {
+ return command.dataRange().contains(key);
+ }
+
public static class Partition extends TrackedRead<EndpointsForToken,
ReplicaPlan.ForTokenRead>
{
private Partition(SinglePartitionReadCommand command,
ReplicaPlan.AbstractForRead<EndpointsForToken, ReplicaPlan.ForTokenRead>
replicaPlan, ConsistencyLevel consistencyLevel)
@@ -230,7 +236,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P
extends ReplicaPlan.
return hostids;
}
- public void start(long expiresAt)
+ private void start(long expiresAt, Consumer<PartialTrackedRead>
partialReadConsumer)
{
// TODO: skip local coordination if this node knows its recovering
from an outage
// TODO: read speculation
@@ -253,8 +259,9 @@ public abstract class TrackedRead<E extends Endpoints<E>, P
extends ReplicaPlan.
if (dataNode == localReplica)
{
+ logger.trace("Locally coordinating {}", readId);
Stage.READ.submit(() -> {
- TrackedLocalReadCoordinator coordinator =
MutationTrackingService.instance.localReads().beginRead(readId,
ClusterMetadata.current(), command, consistencyLevel, summaryHostIds,
expiresAt);
+ TrackedLocalReadCoordinator coordinator =
MutationTrackingService.instance.localReads().beginRead(readId,
ClusterMetadata.current(), command, consistencyLevel, summaryHostIds,
expiresAt, partialReadConsumer);
coordinator.addCallback((response, error) -> {
if (error != null)
{
@@ -269,6 +276,8 @@ public abstract class TrackedRead<E extends Endpoints<E>, P
extends ReplicaPlan.
}
else
{
+ logger.trace("Sending data request for {} to {}", readId,
dataNode.endpoint());
+ Preconditions.checkArgument(partialReadConsumer == null, "Cannot
supply read consumer for nonlocal reads");
DataRequest dataRequest = new DataRequest(readId, command,
consistencyLevel, summaryHostIds);
Message<DataRequest> dataMessage = Message.outWithFlag(verb(),
dataRequest, MessageFlag.CALL_BACK_ON_FAILURE);
MessagingService.instance().sendWithCallback(dataMessage,
dataNode.endpoint(), this);
@@ -286,15 +295,27 @@ public abstract class TrackedRead<E extends Endpoints<E>,
P extends ReplicaPlan.
{
if (localReplica == replica)
{
+ logger.trace("Locally processing summary request for {}",
readId);
Stage.READ.submit(() ->
summaryRequest.executeLocally(summaryMessage, ClusterMetadata.current()));
}
else
{
+ logger.trace("Sending summary request for {} to {}", readId,
replica.endpoint());
MessagingService.instance().send(summaryMessage,
replica.endpoint());
}
}
}
+ public void start(long expiresAt)
+ {
+ start(expiresAt, null);
+ }
+
+ public void startLocal(long expiresAt, Consumer<PartialTrackedRead>
partialReadConsumer)
+ {
+ start(expiresAt, partialReadConsumer);
+ }
+
public void start(Dispatcher.RequestTime requestTime)
{
start(requestTime.computeDeadline(verb().expiresAfterNanos()));
@@ -302,7 +323,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P
extends ReplicaPlan.
private void onResponse(TrackedDataResponse response)
{
- future.trySuccess(response.makeIterator(command));
+ future.trySuccess(response);
}
@Override
@@ -323,7 +344,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P
extends ReplicaPlan.
return true;
}
- public Future<PartitionIterator> future()
+ public Future<TrackedDataResponse> future()
{
return future;
}
@@ -332,7 +353,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P
extends ReplicaPlan.
{
try
{
- return future.get(command.getTimeout(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
+ return future.get(command.getTimeout(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS).makeIterator(command);
}
catch (InterruptedException e)
{
@@ -410,7 +431,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P
extends ReplicaPlan.
@Override
public void executeLocally(Message<? extends Request> message,
ClusterMetadata metadata)
{
- TrackedLocalReadCoordinator coordinator =
MutationTrackingService.instance.localReads().beginRead(readId, metadata,
command, consistencyLevel, summaryNodes, message.expiresAtNanos());
+ TrackedLocalReadCoordinator coordinator =
MutationTrackingService.instance.localReads().beginRead(readId, metadata,
command, consistencyLevel, summaryNodes, message.expiresAtNanos(), null);
coordinator.addCallback((response, error) -> {
if (error != null)
{
diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml
index 48d9859b67..71371d15c9 100644
--- a/test/conf/logback-dtest.xml
+++ b/test/conf/logback-dtest.xml
@@ -45,9 +45,9 @@
<encoder>
<pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L -
%msg%n</pattern>
</encoder>
- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
- <level>DEBUG</level>
- </filter>
+<!-- <filter class="ch.qos.logback.classic.filter.ThresholdFilter">-->
+<!-- <level>DEBUG</level>-->
+<!-- </filter>-->
</appender>
<logger name="io.netty.*" level="WARN"/>
@@ -58,4 +58,6 @@
<appender-ref ref="INSTANCESTDERR" />
<appender-ref ref="INSTANCESTDOUT" />
</root>
+ <logger name="org.apache.cassandra.replication" level="TRACE"/>
+ <logger name="org.apache.cassandra.service.reads.tracked" level="TRACE"/>
</configuration>
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairRangeQueriesTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairRangeQueriesTest.java
index b2af1f938e..3ad81cdc9e 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairRangeQueriesTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairRangeQueriesTest.java
@@ -18,9 +18,15 @@
package org.apache.cassandra.distributed.test;
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.junit.Assume;
import org.junit.Test;
-import org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.ByteBufferUtil;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
@@ -156,7 +162,6 @@ public class ReadRepairRangeQueriesTest extends
ReadRepairQueryTester
@Test
public void testRangeQueryWithFilterOnSelectedColumnOnSkinnyTable()
{
- MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "uses ALLOW
FILTERING (CASSANDRA-20555)");
tester("WHERE a=2 ALLOW FILTERING")
.createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int)")
.mutate("INSERT INTO %s (k, a, b) VALUES (1, 2, 3)",
@@ -183,7 +188,6 @@ public class ReadRepairRangeQueriesTest extends
ReadRepairQueryTester
@Test
public void testRangeQueryWithFilterOnSelectedColumnOnWideTable()
{
- MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "uses ALLOW
FILTERING (CASSANDRA-20555)");
tester("WHERE a=1 ALLOW FILTERING")
.createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY
KEY(k, c))")
.mutate("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)",
@@ -215,8 +219,6 @@ public class ReadRepairRangeQueriesTest extends
ReadRepairQueryTester
@Test
public void testRangeQueryWithFilterOnUnselectedColumnOnSkinnyTable()
{
- MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "uses ALLOW
FILTERING (CASSANDRA-20555)");
-
tester("WHERE b=3 ALLOW FILTERING")
.createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int)")
.mutate("INSERT INTO %s (k, a, b) VALUES (1, 2, 3)",
@@ -243,8 +245,6 @@ public class ReadRepairRangeQueriesTest extends
ReadRepairQueryTester
@Test
public void testRangeQueryWithFilterOnUnselectedColumnOnWideTable()
{
- if (coordinator == 2)
- MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "Depends
on ALLOW FILTERING");
tester("WHERE b=2 ALLOW FILTERING")
.createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY
KEY(k, c))")
.mutate("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)",
@@ -269,4 +269,113 @@ public class ReadRepairRangeQueriesTest extends
ReadRepairQueryTester
rows(row(2, 1, 1, 1)),
rows());
}
+
+ /**
+ * Test range queries using filtering on an selected column on a table
with clustering columns.
+ */
+ @Test
+ public void testRangeQueryWithFilterOnSelectedColumnConflictingUpdates()
+ {
+ Assume.assumeTrue(replicationType.isTracked()); // flaky for
untracked replication
+ tester("WHERE a=1 ALLOW FILTERING")
+ .createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY
KEY(k, c))")
+ .mutate(1, "INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)",
+ "INSERT INTO %s (k, c, a, b) VALUES (1, 2, 2, 2)",
+ "INSERT INTO %s (k, c, a, b) VALUES (2, 1, 2, 1)",
+ "INSERT INTO %s (k, c, a, b) VALUES (2, 2, 2, 2)")
+ .mutate(2, "INSERT INTO %s (k, c, a, b) VALUES (2, 1, 1, 1)")
+ .mutate(1) // updates the last coordinator for mutation tracking
+ .queryColumns("k, c, a", 2, 2,
+ rows(row(1, 1, 1), row(2, 1, 1)),
+ rows(row(1, 1, 1, 1), row(2, 1, 1, 1)),
+ rows(row(1, 1, 1, null), row(2, 1, 1, 1)))
+ .tearDown(2,
+ rows(row(1, 1, 1, 1), row(1, 2, 2, 2), row(2, 1, 1, 1),
row(2, 2, 2, 2)),
+ rows(row(1, 1, 1, 1), row(2, 1, 1, 1)));
+ }
+
+ /**
+ * Helper class for creating rows in token sorted order
+ */
+ private static class TokenSortedKey
+ {
+ final int key;
+
+ public TokenSortedKey(int key)
+ {
+ this.key = key;
+ }
+
+ Token token()
+ {
+ return
Murmur3Partitioner.instance.getToken(ByteBufferUtil.bytes(key));
+ }
+
+ public Object[] row(Object... values)
+ {
+ Object[] row = new Object[values.length + 1];
+ row[0] = key;
+ System.arraycopy(values, 0, row, 1, values.length);
+ return row;
+ }
+
+ public static TokenSortedKey[] create(int numKeys)
+ {
+ TokenSortedKey[] keys = new TokenSortedKey[numKeys];
+ for (int i = 0; i < numKeys; i++)
+ keys[i] = new TokenSortedKey(i);
+ Arrays.sort(keys, Comparator.comparing(TokenSortedKey::token));
+ return keys;
+ }
+
+ }
+
+ /**
+ * Tests the allow filtering case where the local coordinator filters out
a partition that hashes to a token
+ * between 2 valid matches and another replica reports a mutation that
causes it to be re-added to the result
+ */
+ @Test
+ public void testFilteredInterleavedShortRead()
+ {
+ Assume.assumeTrue(replicationType.isTracked()); // flaky for
untracked replication
+ TokenSortedKey[] keys = TokenSortedKey.create(4);
+
+ tester("WHERE a=1 ALLOW FILTERING")
+ .createTable("CREATE TABLE %s (k int, c int, a int, PRIMARY KEY(k,
c))")
+ .mutate(1, "INSERT INTO %s (k, c, a) VALUES (" + keys[0].key + ", 1,
1)",
+ "INSERT INTO %s (k, c, a) VALUES (" + keys[0].key + ", 2, 2)",
+ "INSERT INTO %s (k, c, a) VALUES (" + keys[1].key + ", 1, 2)",
+ "INSERT INTO %s (k, c, a) VALUES (" + keys[2].key + ", 1, 1)")
+ .mutate(2, "INSERT INTO %s (k, c, a) VALUES (" + keys[1].key + ", 1,
1)") // make keys[1] match the filter
+ .mutate(1) // updates the last coordinator for mutation tracking
+ .queryColumns("k, c, a", 3, 0,
+ rows(keys[0].row(1, 1), keys[1].row(1, 1),
keys[2].row(1, 1)),
+ rows(keys[0].row(1, 1), keys[1].row(1, 1),
keys[2].row(1, 1)),
+ rows(keys[0].row(1, 1), keys[1].row(1, 1),
keys[2].row(1, 1)))
+ .tearDown(1,
+ rows(keys[0].row(1, 1), keys[0].row(2, 2), keys[1].row(1,
1), keys[2].row(1, 1)),
+ rows(keys[0].row(1, 1), keys[1].row(1, 1), keys[2].row(1,
1)));
+ }
+
+ @Test
+ public void testNonFilteredInterleavedShortRead()
+ {
+ Assume.assumeTrue(replicationType.isTracked()); // flaky for
untracked replication
+ TokenSortedKey[] keys = TokenSortedKey.create(4);
+
+ tester("WHERE a=1 ALLOW FILTERING")
+ .createTable("CREATE TABLE %s (k int, c int, a int, PRIMARY KEY(k,
c))")
+ .mutate(1, "INSERT INTO %s (k, c, a) VALUES (" + keys[0].key + ", 1,
1)",
+ "INSERT INTO %s (k, c, a) VALUES (" + keys[0].key + ", 2, 2)",
+ "INSERT INTO %s (k, c, a) VALUES (" + keys[2].key + ", 1, 1)")
+ .mutate(2, "INSERT INTO %s (k, c, a) VALUES (" + keys[1].key + ", 1,
1)") // make keys[1] match the filter
+ .mutate(1) // updates the last coordinator for mutation tracking
+ .queryColumns("k, c, a", 3, 0,
+ rows(keys[0].row(1, 1), keys[1].row(1, 1),
keys[2].row(1, 1)),
+ rows(keys[0].row(1, 1), keys[1].row(1, 1),
keys[2].row(1, 1)),
+ rows(keys[0].row(1, 1), keys[1].row(1, 1),
keys[2].row(1, 1)))
+ .tearDown(1,
+ rows(keys[0].row(1, 1), keys[0].row(2, 2), keys[1].row(1,
1), keys[2].row(1, 1)),
+ rows(keys[0].row(1, 1), keys[1].row(1, 1), keys[2].row(1,
1)));
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairSliceQueriesTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairSliceQueriesTest.java
index 58e9ecca11..62f3702d50 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairSliceQueriesTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairSliceQueriesTest.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.distributed.test;
import org.junit.Test;
-import org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils;
-
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
/**
@@ -64,7 +62,6 @@ public class ReadRepairSliceQueriesTest extends
ReadRepairQueryTester
@Test
public void testSliceQueryWithFilter()
{
- MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "Depends on
ALLOW FILTERING");
tester("WHERE k=0 AND a>10 AND a<40 ALLOW FILTERING")
.createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY
KEY(k, c))")
.mutate("INSERT INTO %s (k, c, a, b) VALUES (0, 1, 10, 100)",
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java
index 7e17c9c557..7d859ce110 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java
@@ -176,7 +176,7 @@ public class MutationTrackingPendingReadTest
// check that the returned data contains the unapplied mutation
try (PartialTrackedRead.CompletedRead completedRead =
read.complete();
- PartitionIterator partitions = completedRead.iterator())
+ PartitionIterator partitions =
completedRead.response().makeIterator(command))
{
Assert.assertTrue(partitions.hasNext());
try (RowIterator rowIterator = partitions.next())
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java
index 306d6e5092..37f6c6a46f 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.distributed.test.tracking;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.replication.CoordinatorLogId;
import org.apache.cassandra.replication.MutationSummary;
import org.junit.Assert;
@@ -77,6 +79,31 @@ public class MutationTrackingReadReconciliationTest extends
TestBaseImpl
awaitNodeLiveness(from,
InetAddressAndPort.getByAddress(node.broadcastAddress()), true);
}
+ // TODO (expected): remove after speculation is implemented
+ private static Object[][] queryWithRetries(ICoordinator coordinator,
String query) throws InterruptedException
+ {
+ int attempt = 0;
+ for (;;)
+ {
+ attempt++;
+ try
+ {
+ return coordinator.execute(query, ConsistencyLevel.QUORUM);
+ }
+ catch (Throwable t)
+ {
+ if (attempt < 10 &&
t.getClass().getSimpleName().equals(ReadTimeoutException.class.getSimpleName()))
+ {
+ Thread.sleep(2000);
+ }
+ else
+ {
+ throw t;
+ }
+ }
+ }
+ }
+
/**
* Test a read reconciliation where the coordinator doesn't have a read
response it needs to apply
* additional mutations to
@@ -146,9 +173,8 @@ public class MutationTrackingReadReconciliationTest extends
TestBaseImpl
awaitNodeAlive(cluster.get(1), cluster.get(3));
awaitNodeDead(cluster.get(1), cluster.get(2));
-
Assert.assertEquals(0, numLogReconciliations(cluster.get(1)));
- Object[][] result = cluster.coordinator(1).execute(format("SELECT
* FROM %s.%s WHERE k=1", keyspaceName, tableName), ConsistencyLevel.QUORUM);
+ Object[][] result = queryWithRetries(cluster.coordinator(1),
format("SELECT * FROM %s.%s WHERE k=1", keyspaceName, tableName));
Assert.assertEquals(row(row(1, 0, 0), row(1, 1, 1)), result);
// check that node3 has the new ids
@@ -225,7 +251,7 @@ public class MutationTrackingReadReconciliationTest extends
TestBaseImpl
Assert.assertEquals(0, numLogReconciliations(cluster.get(1)));
- Object[][] result = cluster.coordinator(3).execute(format("SELECT
* FROM %s.%s WHERE k=1", keyspaceName, tableName), ConsistencyLevel.QUORUM);
+ Object[][] result = queryWithRetries(cluster.coordinator(3),
format("SELECT * FROM %s.%s WHERE k=1", keyspaceName, tableName));
Assert.assertEquals(row(row(1, 0, 0), row(1, 1, 1)), result);
// check that node3 has the new ids
@@ -302,7 +328,7 @@ public class MutationTrackingReadReconciliationTest extends
TestBaseImpl
// No reconciliation has happened yet
Assert.assertEquals(0, numLogReconciliations(cluster.get(1)));
- Object[][] result = cluster.coordinator(1).execute(format("SELECT
* FROM %s.%s", keyspaceName, tableName), ConsistencyLevel.QUORUM);
+ Object[][] result = queryWithRetries(cluster.coordinator(1),
format("SELECT * FROM %s.%s", keyspaceName, tableName));
Assert.assertEquals(row(row(1, 0, 0), row(1, 1, 1), row(2, 2, 2)),
result);
// Coordinator sends its missing mutations to 3 on read
@@ -377,7 +403,7 @@ public class MutationTrackingReadReconciliationTest extends
TestBaseImpl
Assert.assertEquals(0, numLogReconciliations(cluster.get(3)));
- Object[][] result = cluster.coordinator(3).execute(format("SELECT
* FROM %s.%s", keyspaceName, tableName), ConsistencyLevel.QUORUM);
+ Object[][] result = queryWithRetries(cluster.coordinator(3),
format("SELECT * FROM %s.%s", keyspaceName, tableName));
Assert.assertEquals(row(row(1, 0, 0), row(1, 1, 1), row(2, 2, 2)),
result);
// Coordinator sends its missing mutations to 3 on read
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]