This is an automated email from the ASF dual-hosted git repository.
jmckenzie pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new f4b69ba0e8 Fix "open RT bound as its last item" exception
f4b69ba0e8 is described below
commit f4b69ba0e82bb051e56a92d792142034d9f617f0
Author: Josh McKenzie <[email protected]>
AuthorDate: Mon Sep 19 14:49:10 2022 -0400
Fix "open RT bound as its last item" exception
Patch by Marcus Eriksson; reviewed by Aleksey Yeschenko and Josh McKenzie
for CASSANDRA-17810
Co-authored-by: Marcus Eriksson <[email protected]>
Co-authored-by: Josh McKenzie <[email protected]>
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/ReadCommand.java | 125 ++++++++++++---------
.../cassandra/db/ReadCommandVerbHandler.java | 26 +++--
.../cassandra/db/transform/RTBoundValidator.java | 45 ++++----
.../exceptions/QueryCancelledException.java | 28 +++++
.../org/apache/cassandra/service/StorageProxy.java | 17 ++-
.../distributed/test/TimeoutAbortTest.java | 62 ++++++++++
.../org/apache/cassandra/db/ReadCommandTest.java | 34 +++++-
8 files changed, 249 insertions(+), 89 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 60e5f976e5..bda1ba04d7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Fix "open RT bound as its last item" exception (CASSANDRA-17810)
* Fix leak of non-standard Java types in JMX MBeans
`org.apache.cassandra.db:type=StorageService`
and `org.apache.cassandra.db:type=RepairService` as clients using JMX
cannot handle them. More details in NEWS.txt (CASSANDRA-17668)
* Deprecate Throwables.propagate usage (CASSANDRA-14218)
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 358d408919..ae64710005 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.exceptions.QueryCancelledException;
import org.apache.cassandra.net.MessageFlag;
import org.apache.cassandra.net.ParamType;
import org.apache.cassandra.net.Verb;
@@ -422,7 +423,8 @@ public abstract class ReadCommand extends AbstractReadQuery
try
{
iterator = withQuerySizeTracking(iterator);
- iterator = withStateTracking(iterator);
+ iterator = maybeSlowDownForTesting(iterator);
+ iterator = withQueryCancellation(iterator);
iterator =
RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs,
executionController), Stage.PURGED, false);
iterator = withMetricsRecording(iterator, cfs.metric,
startTimeNanos);
@@ -601,58 +603,6 @@ public abstract class ReadCommand extends AbstractReadQuery
return Transformation.apply(iter, new MetricRecording());
}
- protected class CheckForAbort extends
StoppingTransformation<UnfilteredRowIterator>
- {
- long lastChecked = 0;
-
- protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator
partition)
- {
- if (maybeAbort())
- {
- partition.close();
- return null;
- }
-
- return Transformation.apply(partition, this);
- }
-
- protected Row applyToRow(Row row)
- {
- if (TEST_ITERATION_DELAY_MILLIS > 0)
- maybeDelayForTesting();
-
- return maybeAbort() ? null : row;
- }
-
- private boolean maybeAbort()
- {
- /**
- * TODO: this is not a great way to abort early; why not expressly
limit checks to 10ms intervals?
- * The value returned by approxTime.now() is updated only every
- * {@link
org.apache.cassandra.utils.MonotonicClock.SampledClock.CHECK_INTERVAL_MS}, by
default 2 millis. Since MonitorableImpl
- * relies on approxTime, we don't need to check unless the
approximate time has elapsed.
- */
- if (lastChecked == approxTime.now())
- return false;
-
- lastChecked = approxTime.now();
-
- if (isAborted())
- {
- stop();
- return true;
- }
-
- return false;
- }
-
- private void maybeDelayForTesting()
- {
- if (!metadata().keyspace.startsWith("system"))
- FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
- }
- }
-
private boolean shouldTrackSize(DataStorageSpec.LongBytesBound
warnThresholdBytes, DataStorageSpec.LongBytesBound abortThresholdBytes)
{
return trackWarnings
@@ -737,9 +687,74 @@ public abstract class ReadCommand extends AbstractReadQuery
return iterator;
}
- protected UnfilteredPartitionIterator
withStateTracking(UnfilteredPartitionIterator iter)
+ private class QueryCancellationChecker extends
StoppingTransformation<UnfilteredRowIterator>
+ {
+ long lastCheckedAt = 0;
+
+ @Override
+ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator
partition)
+ {
+ maybeCancel();
+ return Transformation.apply(partition, this);
+ }
+
+ @Override
+ protected Row applyToRow(Row row)
+ {
+ maybeCancel();
+ return row;
+ }
+
+ private void maybeCancel()
+ {
+ /*
+ * The value returned by approxTime.now() is updated only every
+ * {@link
org.apache.cassandra.utils.MonotonicClock.SampledClock.CHECK_INTERVAL_MS}, by
default 2 millis.
+ * Since MonitorableImpl relies on approxTime, we don't need to
check unless the approximate time has elapsed.
+ */
+ if (lastCheckedAt == approxTime.now())
+ return;
+ lastCheckedAt = approxTime.now();
+
+ if (isAborted())
+ {
+ stop();
+ throw new QueryCancelledException(ReadCommand.this);
+ }
+ }
+ }
+
+ private UnfilteredPartitionIterator
withQueryCancellation(UnfilteredPartitionIterator iter)
+ {
+ return Transformation.apply(iter, new QueryCancellationChecker());
+ }
+
+ /**
+ * A transformation used for simulating slow queries by tests.
+ */
+ private static class DelayInjector extends
Transformation<UnfilteredRowIterator>
+ {
+ @Override
+ protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator
partition)
+ {
+ FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
+ return Transformation.apply(partition, this);
+ }
+
+ @Override
+ protected Row applyToRow(Row row)
+ {
+ FBUtilities.sleepQuietly(TEST_ITERATION_DELAY_MILLIS);
+ return row;
+ }
+ }
+
+ private UnfilteredPartitionIterator
maybeSlowDownForTesting(UnfilteredPartitionIterator iter)
{
- return Transformation.apply(iter, new CheckForAbort());
+ if (TEST_ITERATION_DELAY_MILLIS > 0 &&
!SchemaConstants.isSystemKeyspace(metadata().keyspace))
+ return Transformation.apply(iter, new DelayInjector());
+ else
+ return iter;
}
/**
diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
index 92265687de..f693bbc7c3 100644
--- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.QueryCancelledException;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.net.IVerbHandler;
import org.apache.cassandra.net.Message;
@@ -77,18 +78,29 @@ public class ReadCommandVerbHandler implements
IVerbHandler<ReadCommand>
MessagingService.instance().send(reply, message.from());
return;
}
+ catch (AssertionError t)
+ {
+ throw new AssertionError(String.format("Caught an error while
trying to process the command: %s", command.toCQLString()), t);
+ }
+ catch (QueryCancelledException e)
+ {
+ logger.debug("Query cancelled (timeout)", e);
+ response = null;
+ assert !command.isCompleted() : "Read marked as completed despite
being aborted by timeout to table " + command.metadata();
+ }
- if (!command.complete())
+ if (command.complete())
+ {
+ Tracing.trace("Enqueuing response to {}", message.from());
+ Message<ReadResponse> reply = message.responseWith(response);
+ reply = MessageParams.addToMessage(reply);
+ MessagingService.instance().send(reply, message.from());
+ }
+ else
{
Tracing.trace("Discarding partial response to {} (timed out)",
message.from());
MessagingService.instance().metrics.recordDroppedMessage(message,
message.elapsedSinceCreated(NANOSECONDS), NANOSECONDS);
- return;
}
-
- Tracing.trace("Enqueuing response to {}", message.from());
- Message<ReadResponse> reply = message.responseWith(response);
- reply = MessageParams.addToMessage(reply);
- MessagingService.instance().send(reply, message.from());
}
private void validateTransientStatus(Message<ReadCommand> message)
diff --git a/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
index eb37f4bc1c..e197ce20b7 100644
--- a/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
+++ b/src/java/org/apache/cassandra/db/transform/RTBoundValidator.java
@@ -17,11 +17,12 @@
*/
package org.apache.cassandra.db.transform;
+import java.util.Objects;
+
import org.apache.cassandra.db.DeletionTime;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.RangeTombstoneMarker;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.schema.TableMetadata;
/**
* A validating transformation that sanity-checks the sequence of RT bounds
and boundaries in every partition.
@@ -51,29 +52,27 @@ public final class RTBoundValidator extends
Transformation<UnfilteredRowIterator
public static UnfilteredRowIterator validate(UnfilteredRowIterator
partition, Stage stage, boolean enforceIsClosed)
{
- return Transformation.apply(partition, new RowsTransformation(stage,
partition.metadata(), partition.isReverseOrder(), enforceIsClosed));
+ return Transformation.apply(partition, new RowsTransformation(stage,
partition, enforceIsClosed));
}
@Override
public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator
partition)
{
- return Transformation.apply(partition, new RowsTransformation(stage,
partition.metadata(), partition.isReverseOrder(), enforceIsClosed));
+ return Transformation.apply(partition, new RowsTransformation(stage,
partition, enforceIsClosed));
}
private final static class RowsTransformation extends Transformation
{
private final Stage stage;
- private final TableMetadata metadata;
- private final boolean isReverseOrder;
private final boolean enforceIsClosed;
+ private final UnfilteredRowIterator partition;
private DeletionTime openMarkerDeletionTime;
- private RowsTransformation(Stage stage, TableMetadata metadata,
boolean isReverseOrder, boolean enforceIsClosed)
+ private RowsTransformation(Stage stage, UnfilteredRowIterator
partition, boolean enforceIsClosed)
{
this.stage = stage;
- this.metadata = metadata;
- this.isReverseOrder = isReverseOrder;
+ this.partition = partition;
this.enforceIsClosed = enforceIsClosed;
}
@@ -83,25 +82,25 @@ public final class RTBoundValidator extends
Transformation<UnfilteredRowIterator
if (null == openMarkerDeletionTime)
{
// there is no open RT in the stream - we are expecting a
*_START_BOUND
- if (marker.isClose(isReverseOrder))
- throw ise("unexpected end bound or boundary " +
marker.toString(metadata));
+ if (marker.isClose(partition.isReverseOrder()))
+ throw ise("unexpected end bound or boundary " +
marker.toString(partition.metadata()));
}
else
{
// there is an open RT in the stream - we are expecting a
*_BOUNDARY or an *_END_BOUND
- if (!marker.isClose(isReverseOrder))
- throw ise("start bound followed by another start bound " +
marker.toString(metadata));
+ if (!marker.isClose(partition.isReverseOrder()))
+ throw ise("start bound followed by another start bound " +
marker.toString(partition.metadata()));
// deletion times of open/close markers must match
- DeletionTime deletionTime =
marker.closeDeletionTime(isReverseOrder);
+ DeletionTime deletionTime =
marker.closeDeletionTime(partition.isReverseOrder());
if (!deletionTime.equals(openMarkerDeletionTime))
- throw ise("open marker and close marker have different
deletion times");
+ throw ise("open marker and close marker have different
deletion times, close=" + deletionTime);
openMarkerDeletionTime = null;
}
- if (marker.isOpen(isReverseOrder))
- openMarkerDeletionTime =
marker.openDeletionTime(isReverseOrder);
+ if (marker.isOpen(partition.isReverseOrder()))
+ openMarkerDeletionTime =
marker.openDeletionTime(partition.isReverseOrder());
return marker;
}
@@ -115,9 +114,17 @@ public final class RTBoundValidator extends
Transformation<UnfilteredRowIterator
private IllegalStateException ise(String why)
{
- String message =
- String.format("%s UnfilteredRowIterator for %s has an illegal
RT bounds sequence: %s", stage, metadata, why);
- throw new IllegalStateException(message);
+ throw new IllegalStateException(message(why));
+ }
+
+ private String message(String why)
+ {
+ return String.format("%s UnfilteredRowIterator for %s (key: %s
omdt: [%s]) has an illegal RT bounds sequence: %s",
+ stage,
+ partition.metadata(),
+
partition.metadata().partitionKeyType.getString(partition.partitionKey().getKey()),
+ Objects.toString(openMarkerDeletionTime, "not
present"),
+ why);
}
}
}
diff --git
a/src/java/org/apache/cassandra/exceptions/QueryCancelledException.java
b/src/java/org/apache/cassandra/exceptions/QueryCancelledException.java
new file mode 100644
index 0000000000..45b6334b8d
--- /dev/null
+++ b/src/java/org/apache/cassandra/exceptions/QueryCancelledException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.exceptions;
+
+import org.apache.cassandra.db.ReadCommand;
+
+public class QueryCancelledException extends RuntimeException
+{
+ public QueryCancelledException(ReadCommand command)
+ {
+ super("Query cancelled for taking too long: " + command.toCQLString());
+ }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 4a66b511be..3c20aad9ce 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -44,11 +44,6 @@ import com.google.common.cache.CacheLoader;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.service.paxos.Ballot;
-import org.apache.cassandra.service.paxos.Commit;
-import org.apache.cassandra.service.paxos.ContentionStrategy;
-import org.apache.cassandra.service.paxos.Paxos;
-import org.apache.cassandra.service.paxos.PaxosState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,6 +84,7 @@ import
org.apache.cassandra.exceptions.CasWriteUnknownResultException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.IsBootstrappingException;
import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.exceptions.QueryCancelledException;
import org.apache.cassandra.exceptions.ReadAbortException;
import org.apache.cassandra.exceptions.ReadFailureException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
@@ -126,6 +122,11 @@ import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.ContentionStrategy;
+import org.apache.cassandra.service.paxos.Paxos;
+import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.service.paxos.v1.PrepareCallback;
import org.apache.cassandra.service.paxos.v1.ProposeCallback;
import org.apache.cassandra.service.reads.AbstractReadExecutor;
@@ -2170,6 +2171,12 @@ public class StorageProxy implements StorageProxyMBean
response = command.createEmptyResponse();
readRejected = true;
}
+ catch (QueryCancelledException e)
+ {
+ logger.debug("Query cancelled (timeout)", e);
+ response = null;
+ assert !command.isCompleted() : "Local read marked as
completed despite being aborted by timeout to table " + command.metadata();
+ }
if (command.complete())
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/TimeoutAbortTest.java
b/test/distributed/org/apache/cassandra/distributed/test/TimeoutAbortTest.java
new file mode 100644
index 0000000000..8adf30777f
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/TimeoutAbortTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static org.junit.Assert.assertFalse;
+import static org.psjava.util.AssertStatus.assertTrue;
+
+public class TimeoutAbortTest extends TestBaseImpl
+{
+ @Test
+ public void timeoutTest() throws IOException, InterruptedException
+ {
+ System.setProperty("cassandra.test.read_iteration_delay_ms", "5000");
+ try (Cluster cluster = init(Cluster.build(1).start()))
+ {
+ cluster.schemaChange(withKeyspace("create table %s.tbl (id int,
ck1 int, ck2 int, d int, primary key (id, ck1, ck2))"));
+ cluster.coordinator(1).execute(withKeyspace("delete from %s.tbl
using timestamp 5 where id = 1 and ck1 = 77 "), ConsistencyLevel.ALL);
+ cluster.get(1).flush(KEYSPACE);
+ Thread.sleep(1000);
+ for (int i = 0; i < 100; i++)
+ cluster.coordinator(1).execute(withKeyspace("insert into
%s.tbl (id, ck1, ck2, d) values (1,77,?,1) using timestamp 10"),
ConsistencyLevel.ALL, i);
+ cluster.get(1).flush(KEYSPACE);
+ boolean caughtException = false;
+ try
+ {
+ cluster.coordinator(1).execute(withKeyspace("select * from
%s.tbl where id=1 and ck1 = 77"), ConsistencyLevel.ALL);
+ }
+ catch (Exception e)
+ {
+
assertTrue(e.getClass().getName().contains("ReadTimeoutException"));
+ caughtException = true;
+ }
+ assertTrue(caughtException);
+ List<String> errors =
cluster.get(1).logs().grepForErrors().getResult();
+ assertFalse(errors.toString(), errors.stream().anyMatch(s ->
s.contains("open RT bound")));
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 43a7952175..bf272b87cf 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -56,6 +56,7 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.exceptions.QueryCancelledException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
@@ -232,7 +233,16 @@ public class ReadCommandTest
assertEquals(2, Util.getAll(readCommand).size());
readCommand.abort();
- assertEquals(0, Util.getAll(readCommand).size());
+ boolean cancelled = false;
+ try
+ {
+ Util.getAll(readCommand);
+ }
+ catch (QueryCancelledException e)
+ {
+ cancelled = true;
+ }
+ assertTrue(cancelled);
}
@Test
@@ -263,7 +273,16 @@ public class ReadCommandTest
assertEquals(2, partitions.get(0).rowCount());
readCommand.abort();
- assertEquals(0, Util.getAll(readCommand).size());
+ boolean cancelled = false;
+ try
+ {
+ Util.getAll(readCommand);
+ }
+ catch (QueryCancelledException e)
+ {
+ cancelled = true;
+ }
+ assertTrue(cancelled);
}
@Test
@@ -294,7 +313,16 @@ public class ReadCommandTest
assertEquals(2, partitions.get(0).rowCount());
readCommand.abort();
- assertEquals(0, Util.getAll(readCommand).size());
+ boolean cancelled = false;
+ try
+ {
+ Util.getAll(readCommand);
+ }
+ catch (QueryCancelledException e)
+ {
+ cancelled = true;
+ }
+ assertTrue(cancelled);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]