This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c4b1c0614e Read/Write/Truncate throw RequestFailure in a race
condition with callback timeouts, should return Timeout instead
c4b1c0614e is described below
commit c4b1c0614e42b4ea2064822d31c28aa5d4f1450a
Author: David Capwell <[email protected]>
AuthorDate: Fri Aug 19 16:42:56 2022 -0700
Read/Write/Truncate throw RequestFailure in a race condition with callback
timeouts, should return Timeout instead
patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-17828
---
CHANGES.txt | 1 +
.../org/apache/cassandra/net/RequestCallback.java | 17 ++
.../service/AbstractWriteResponseHandler.java | 40 ++--
.../cassandra/service/TruncateResponseHandler.java | 29 ++-
.../cassandra/service/reads/ReadCallback.java | 13 +-
.../test/metrics/RequestTimeoutTest.java | 241 +++++++++++++++++++++
.../org/apache/cassandra/utils/AssertionUtils.java | 124 +++++++++++
.../apache/cassandra/utils/AssertionUtilsTest.java | 45 ++++
8 files changed, 483 insertions(+), 27 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 36beb3c27f..3fd1a8c747 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Read/Write/Truncate throw RequestFailure in a race condition with callback
timeouts, should return Timeout instead (CASSANDRA-17828)
* Add ability to log load profiles at fixed intervals (CASSANDRA-17821)
* Protect against Gossip backing up due to a quarantined endpoint without
version information (CASSANDRA-17830)
* NPE in org.apache.cassandra.cql3.Attributes.getTimeToLive (CASSANDRA-17822)
diff --git a/src/java/org/apache/cassandra/net/RequestCallback.java
b/src/java/org/apache/cassandra/net/RequestCallback.java
index bd14cae1d0..14e0169b85 100644
--- a/src/java/org/apache/cassandra/net/RequestCallback.java
+++ b/src/java/org/apache/cassandra/net/RequestCallback.java
@@ -17,6 +17,8 @@
*/
package org.apache.cassandra.net;
+import java.util.Map;
+
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
@@ -63,4 +65,19 @@ public interface RequestCallback<T>
return false;
}
+ static boolean isTimeout(Map<InetAddressAndPort, RequestFailureReason>
failureReasonByEndpoint)
+ {
+ // The reason that all must be timeout to be called a timeout is as
follows
+ // Assume RF=6, QUORUM, and failureReasonByEndpoint.size() == 3
+ // R1 -> TIMEOUT
+ // R2 -> TIMEOUT
+ // R3 -> READ_TOO_MANY_TOMBSTONES
+ // Since we got a reply back, and that was a failure, we should return
a failure letting the user know.
+ // When all failures are a timeout, then this is a race condition with
+ // org.apache.cassandra.utils.concurrent.Awaitable.await(long,
java.util.concurrent.TimeUnit)
+ // The race is that the message expire path runs and expires all
messages, this then casues the condition
+ // to signal telling the caller "got all replies!".
+ return
failureReasonByEndpoint.values().stream().allMatch(RequestFailureReason.TIMEOUT::equals);
+ }
+
}
diff --git
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 4d75f19bca..76ad4c2ff8 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -17,12 +17,16 @@
*/
package org.apache.cassandra.service;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -113,34 +117,42 @@ public abstract class AbstractWriteResponseHandler<T>
implements RequestCallback
{
long timeoutNanos = currentTimeoutNanos();
- boolean success;
+ boolean signaled;
try
{
- success = condition.await(timeoutNanos, NANOSECONDS);
+ signaled = condition.await(timeoutNanos, NANOSECONDS);
}
catch (InterruptedException e)
{
throw new UncheckedInterruptedException(e);
}
- if (!success)
- {
- int blockedFor = blockFor();
- int acks = ackCount();
- // It's pretty unlikely, but we can race between exiting await
above and here, so
- // that we could now have enough acks. In that case, we "lie" on
the acks count to
- // avoid sending confusing info to the user (see CASSANDRA-6491).
- if (acks >= blockedFor)
- acks = blockedFor - 1;
- throw new WriteTimeoutException(writeType,
replicaPlan.consistencyLevel(), acks, blockedFor);
- }
+ if (!signaled)
+ throwTimeout();
if (blockFor() + failures > candidateReplicaCount())
{
- throw new WriteFailureException(replicaPlan.consistencyLevel(),
ackCount(), blockFor(), writeType, failureReasonByEndpoint);
+ if
(RequestCallback.isTimeout(this.failureReasonByEndpoint.keySet().stream()
+
.filter(this::waitingFor) // DatacenterWriteResponseHandler filters errors from
remote DCs
+
.collect(Collectors.toMap(Function.identity(),
this.failureReasonByEndpoint::get))))
+ throwTimeout();
+
+ throw new WriteFailureException(replicaPlan.consistencyLevel(),
ackCount(), blockFor(), writeType, this.failureReasonByEndpoint);
}
}
+ private void throwTimeout()
+ {
+ int blockedFor = blockFor();
+ int acks = ackCount();
+ // It's pretty unlikely, but we can race between exiting await above
and here, so
+ // that we could now have enough acks. In that case, we "lie" on the
acks count to
+ // avoid sending confusing info to the user (see CASSANDRA-6491).
+ if (acks >= blockedFor)
+ acks = blockedFor - 1;
+ throw new WriteTimeoutException(writeType,
replicaPlan.consistencyLevel(), acks, blockedFor);
+ }
+
public final long currentTimeoutNanos()
{
long requestTimeout = writeType == COUNTER
diff --git a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
index 984ba5a10a..54b1241006 100644
--- a/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TruncateResponseHandler.java
@@ -17,7 +17,9 @@
*/
package org.apache.cassandra.service;
-import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -46,7 +48,7 @@ public class TruncateResponseHandler implements
RequestCallback<TruncateResponse
private final int responseCount;
protected final AtomicInteger responses = new AtomicInteger(0);
private final long start;
- private volatile InetAddress truncateFailingReplica;
+ private final Map<InetAddressAndPort, RequestFailureReason>
failureReasonByEndpoint = new ConcurrentHashMap<>();
public TruncateResponseHandler(int responseCount)
{
@@ -61,24 +63,31 @@ public class TruncateResponseHandler implements
RequestCallback<TruncateResponse
public void get() throws TimeoutException
{
long timeoutNanos = getTruncateRpcTimeout(NANOSECONDS) - (nanoTime() -
start);
- boolean completedInTime;
+ boolean signaled;
try
{
- completedInTime = condition.await(timeoutNanos, NANOSECONDS); //
TODO truncate needs a much longer timeout
+ signaled = condition.await(timeoutNanos, NANOSECONDS); // TODO
truncate needs a much longer timeout
}
catch (InterruptedException e)
{
throw new UncheckedInterruptedException(e);
}
- if (!completedInTime)
- {
+ if (!signaled)
throw new TimeoutException("Truncate timed out - received only " +
responses.get() + " responses");
- }
- if (truncateFailingReplica != null)
+ if (!failureReasonByEndpoint.isEmpty())
{
- throw new TruncateException("Truncate failed on replica " +
truncateFailingReplica);
+ // clone to make sure no race condition happens
+ Map<InetAddressAndPort, RequestFailureReason>
failureReasonByEndpoint = new HashMap<>(this.failureReasonByEndpoint);
+ if (RequestCallback.isTimeout(failureReasonByEndpoint))
+ throw new TimeoutException("Truncate timed out - received only
" + responses.get() + " responses");
+
+ StringBuilder sb = new StringBuilder("Truncate failed on ");
+ for (Map.Entry<InetAddressAndPort, RequestFailureReason> e :
failureReasonByEndpoint.entrySet())
+ sb.append("replica ").append(e.getKey()).append(" ->
").append(e.getValue()).append(", ");
+ sb.setLength(sb.length() - 2);
+ throw new TruncateException(sb.toString());
}
}
@@ -94,7 +103,7 @@ public class TruncateResponseHandler implements
RequestCallback<TruncateResponse
public void onFailure(InetAddressAndPort from, RequestFailureReason
failureReason)
{
// If the truncation hasn't succeeded on some replica, abort and
indicate this back to the client.
- truncateFailingReplica = from.getAddress();
+ failureReasonByEndpoint.put(from, failureReason);
condition.signalAll();
}
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index e69e6bd2b9..c25b1f0f02 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.service.reads;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -120,6 +121,12 @@ public class ReadCallback<E extends Endpoints<E>, P
extends ReplicaPlan.ForRead<
*/
int received = resolver.responses.size();
boolean failed = failures > 0 && (blockFor > received ||
!resolver.isDataPresent());
+ // If all messages came back as a TIMEOUT then signaled=true and
failed=true.
+ // Need to distinguish between a timeout and a failure (network, bad
data, etc.), so store an extra field.
+ // see CASSANDRA-17828
+ boolean timedout = !signaled;
+ if (failed)
+ timedout = RequestCallback.isTimeout(new
HashMap<>(failureReasonByEndpoint));
WarningContext warnings = warningContext;
// save the snapshot so abort state is not changed between now and
when mayAbort gets called
WarningsSnapshot snapshot = null;
@@ -138,19 +145,19 @@ public class ReadCallback<E extends Endpoints<E>, P
extends ReplicaPlan.ForRead<
if (isTracing())
{
String gotData = received > 0 ? (resolver.isDataPresent() ? "
(including data)" : " (only digests)") : "";
- Tracing.trace("{}; received {} of {} responses{}", failed ?
"Failed" : "Timed out", received, blockFor, gotData);
+ Tracing.trace("{}; received {} of {} responses{}", !timedout ?
"Failed" : "Timed out", received, blockFor, gotData);
}
else if (logger.isDebugEnabled())
{
String gotData = received > 0 ? (resolver.isDataPresent() ? "
(including data)" : " (only digests)") : "";
- logger.debug("{}; received {} of {} responses{}", failed ?
"Failed" : "Timed out", received, blockFor, gotData);
+ logger.debug("{}; received {} of {} responses{}", !timedout ?
"Failed" : "Timed out", received, blockFor, gotData);
}
if (snapshot != null)
snapshot.maybeAbort(command, replicaPlan().consistencyLevel(),
received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint);
// Same as for writes, see AbstractWriteResponseHandler
- throw failed
+ throw !timedout
? new ReadFailureException(replicaPlan().consistencyLevel(),
received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint)
: new ReadTimeoutException(replicaPlan().consistencyLevel(),
received, blockFor, resolver.isDataPresent());
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/metrics/RequestTimeoutTest.java
b/test/distributed/org/apache/cassandra/distributed/test/metrics/RequestTimeoutTest.java
new file mode 100644
index 0000000000..2799fca66a
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/metrics/RequestTimeoutTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.metrics;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
+import net.bytebuddy.implementation.bind.annotation.SuperMethod;
+import net.bytebuddy.implementation.bind.annotation.This;
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.utils.AssertionUtils;
+import org.apache.cassandra.exceptions.CasWriteTimeoutException;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.paxos.Paxos;
+import org.apache.cassandra.utils.concurrent.Awaitable;
+import org.apache.cassandra.utils.concurrent.Condition;
+import org.assertj.core.api.Assertions;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.cassandra.utils.AssertionUtils.isThrowable;
+
+public class RequestTimeoutTest extends TestBaseImpl
+{
+ private static final AtomicInteger NEXT = new AtomicInteger(0);
+ public static final int COORDINATOR = 1;
+ private static Cluster CLUSTER;
+
+ @BeforeClass
+ public static void init() throws IOException
+ {
+ CLUSTER = Cluster.build(3)
+ .withConfig(c -> c.set("truncate_request_timeout",
"10s"))
+ .withInstanceInitializer(BB::install)
+ .start();
+ init(CLUSTER);
+ CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.tbl (pk int PRIMARY
KEY, v int)"));
+ }
+
+ @AfterClass
+ public static void cleanup()
+ {
+ if (CLUSTER != null)
+ CLUSTER.close();
+ }
+
+ @Before
+ public void before()
+ {
+ CLUSTER.get(COORDINATOR).runOnInstance(() ->
MessagingService.instance().callbacks.unsafeClear());
+ CLUSTER.filters().reset();
+ BB.reset();
+ }
+
+ @Test
+ public void insert()
+ {
+ CLUSTER.filters().verbs(Verb.MUTATION_REQ.id).to(2).drop();
+ Assertions.assertThatThrownBy(() ->
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("INSERT INTO %s.tbl (pk,
v) VALUES (?, ?)"), ConsistencyLevel.ALL, NEXT.getAndIncrement(),
NEXT.getAndIncrement()))
+ .is(isThrowable(WriteTimeoutException.class));
+ BB.assertIsTimeoutTrue();
+ }
+
+ @Test
+ public void update()
+ {
+ CLUSTER.filters().verbs(Verb.MUTATION_REQ.id).to(2).drop();
+ Assertions.assertThatThrownBy(() ->
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("UPDATE %s.tbl SET v=?
WHERE pk=?"), ConsistencyLevel.ALL, NEXT.getAndIncrement(),
NEXT.getAndIncrement()))
+ .is(isThrowable(WriteTimeoutException.class));
+ BB.assertIsTimeoutTrue();
+ }
+
+ @Test
+ public void batchInsert()
+ {
+ CLUSTER.filters().verbs(Verb.MUTATION_REQ.id).to(2).drop();
+ Assertions.assertThatThrownBy(() ->
CLUSTER.coordinator(COORDINATOR).execute(batch(withKeyspace("INSERT INTO %s.tbl
(pk, v) VALUES (?, ?)")), ConsistencyLevel.ALL, NEXT.getAndIncrement(),
NEXT.getAndIncrement()))
+ .is(isThrowable(WriteTimeoutException.class));
+ BB.assertIsTimeoutTrue();
+ }
+
+ @Test
+ public void rangeSelect()
+ {
+ CLUSTER.filters().verbs(Verb.RANGE_REQ.id).to(2).drop();
+ Assertions.assertThatThrownBy(() ->
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("SELECT * FROM %s.tbl"),
ConsistencyLevel.ALL))
+ .is(isThrowable(ReadTimeoutException.class));
+ BB.assertIsTimeoutTrue();
+ }
+
+ @Test
+ public void select()
+ {
+ CLUSTER.filters().verbs(Verb.READ_REQ.id).to(2).drop();
+ Assertions.assertThatThrownBy(() ->
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("SELECT * FROM %s.tbl
WHERE pk=?"), ConsistencyLevel.ALL, NEXT.getAndIncrement()))
+ .is(isThrowable(ReadTimeoutException.class));
+ BB.assertIsTimeoutTrue();
+ }
+
+ @Test
+ public void truncate()
+ {
+ CLUSTER.filters().verbs(Verb.TRUNCATE_REQ.id).to(2).drop();
+ Assertions.assertThatThrownBy(() ->
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("TRUNCATE %s.tbl"),
ConsistencyLevel.ALL))
+ .is(AssertionUtils.rootCauseIs(TimeoutException.class));
+ BB.assertIsTimeoutTrue();
+ }
+
+ // don't call BB.assertIsTimeoutTrue(); for CAS, as it has its own logic
+
+ @Test
+ public void casV2PrepareInsert()
+ {
+ withPaxos(Config.PaxosVariant.v2);
+
+ CLUSTER.filters().verbs(Verb.PAXOS2_PREPARE_REQ.id).to(2, 3).drop();
+ Assertions.assertThatThrownBy(() ->
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("INSERT INTO %s.tbl (pk,
v) VALUES (?, ?) IF NOT EXISTS"), ConsistencyLevel.ALL, NEXT.getAndIncrement(),
NEXT.getAndIncrement()))
+ .is(isThrowable(CasWriteTimeoutException.class));
+ }
+
+ @Test
+ public void casV2PrepareSelect()
+ {
+ withPaxos(Config.PaxosVariant.v2);
+
+ CLUSTER.filters().verbs(Verb.PAXOS2_PREPARE_REQ.id).to(2, 3).drop();
+ Assertions.assertThatThrownBy(() ->
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("SELECT * FROM %s.tbl
WHERE pk=?"), ConsistencyLevel.SERIAL, NEXT.getAndIncrement()))
+ .is(isThrowable(ReadTimeoutException.class)); // why does
write have its own type but not read?
+ }
+
+ @Test
+ public void casV2CommitInsert()
+ {
+ withPaxos(Config.PaxosVariant.v2);
+
+ CLUSTER.filters().verbs(Verb.PAXOS_COMMIT_REQ.id).to(2, 3).drop();
+ Assertions.assertThatThrownBy(() ->
CLUSTER.coordinator(COORDINATOR).execute(withKeyspace("INSERT INTO %s.tbl (pk,
v) VALUES (?, ?) IF NOT EXISTS"), ConsistencyLevel.ALL, NEXT.getAndIncrement(),
NEXT.getAndIncrement()))
+ .is(isThrowable(CasWriteTimeoutException.class));
+ }
+
+ private static void withPaxos(Config.PaxosVariant variant)
+ {
+ CLUSTER.forEach(i -> i.runOnInstance(() ->
Paxos.setPaxosVariant(variant)));
+ }
+
+ private static String batch(String cql)
+ {
+ return "BEGIN " + BatchStatement.Type.UNLOGGED.name() + " BATCH\n" +
cql + "\nAPPLY BATCH";
+ }
+
+ public static class BB
+ {
+ public static void install(ClassLoader cl, int num)
+ {
+ if (num != COORDINATOR)
+ return;
+ new ByteBuddy().rebase(Condition.Async.class)
+ .method(named("await").and(takesArguments(2)))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+
+ new ByteBuddy().rebase(RequestCallback.class)
+ .method(named("isTimeout"))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static boolean await(long time, TimeUnit units, @This Awaitable
self, @SuperMethod Method method) throws InterruptedException,
InvocationTargetException, IllegalAccessException
+ {
+ // make sure that the underline condition is met before returnning
true
+ // this way its know that the timeouts triggered!
+ while (!((boolean) method.invoke(self, time, units)))
+ {
+ }
+ return true;
+ }
+
+ private static final AtomicInteger TIMEOUTS = new AtomicInteger(0);
+ public static boolean isTimeout(Map<InetAddressAndPort,
RequestFailureReason> failureReasonByEndpoint, @SuperCall Callable<Boolean> fn)
throws Exception
+ {
+ boolean timeout = fn.call();
+ if (timeout)
+ TIMEOUTS.incrementAndGet();
+ return timeout;
+ }
+
+ public static void assertIsTimeoutTrue()
+ {
+ int timeouts = CLUSTER.get(COORDINATOR).callOnInstance(() ->
TIMEOUTS.getAndSet(0));
+ Assertions.assertThat(timeouts).isGreaterThan(0);
+ }
+
+ public static void reset()
+ {
+ CLUSTER.get(COORDINATOR).runOnInstance(() -> TIMEOUTS.set(0));
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/utils/AssertionUtils.java
b/test/unit/org/apache/cassandra/utils/AssertionUtils.java
new file mode 100644
index 0000000000..d5b1981fc1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/AssertionUtils.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import com.google.common.base.Throwables;
+
+import org.assertj.core.api.Condition;
+
+public class AssertionUtils
+{
+ private AssertionUtils()
+ {
+ }
+
+ /**
+ * When working with jvm-dtest the thrown error is in a different {@link
ClassLoader} causing type checks
+ * to fail; this method relies on naming instead.
+ */
+ public static Condition<Object> is(Class<?> klass)
+ {
+ String name = klass.getCanonicalName();
+ return new Condition<Object>() {
+ @Override
+ public boolean matches(Object value)
+ {
+ return value.getClass().getCanonicalName().equals(name);
+ }
+
+ @Override
+ public String toString()
+ {
+ return name;
+ }
+ };
+ }
+
+ public static <T extends Throwable> Condition<Throwable>
isThrowable(Class<T> klass)
+ {
+ // org.assertj.core.api.AbstractAssert.is has <? super ? extends
Throwable> which blocks <T>, so need to
+ // always return Throwable
+ return (Condition<Throwable>) (Condition<?>) is(klass);
+ }
+
+ /**
+ * When working with jvm-dtest the thrown error is in a different {@link
ClassLoader} causing type checks
+ * to fail; this method relies on naming instead.
+ *
+ * This method is different than {@link #is(Class)} as it tries to mimic
instanceOf rather than equality.
+ */
+ public static Condition<Object> isInstanceof(Class<?> klass)
+ {
+ String name = klass.getCanonicalName();
+ return new Condition<Object>() {
+ @Override
+ public boolean matches(Object value)
+ {
+ if (value == null)
+ return false;
+ return matches(value.getClass());
+ }
+
+ private boolean matches(Class<?> input)
+ {
+ for (Class<?> klass = input; klass != null; klass =
klass.getSuperclass())
+ {
+ // extends
+ if (klass.getCanonicalName().equals(name))
+ return true;
+ // implements
+ for (Class<?> i : klass.getInterfaces())
+ {
+ if (matches(i))
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public String toString()
+ {
+ return name;
+ }
+ };
+ }
+
+ public static Condition<Throwable> rootCause(Condition<Throwable> other)
+ {
+ return new Condition<Throwable>() {
+ @Override
+ public boolean matches(Throwable value)
+ {
+ return other.matches(Throwables.getRootCause(value));
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Root cause " + other;
+ }
+ };
+ }
+
+ public static Condition<Throwable> rootCauseIs(Class<? extends Throwable>
klass)
+ {
+ return rootCause((Condition<Throwable>) (Condition<?>) is(klass));
+ }
+}
diff --git a/test/unit/org/apache/cassandra/utils/AssertionUtilsTest.java
b/test/unit/org/apache/cassandra/utils/AssertionUtilsTest.java
new file mode 100644
index 0000000000..e3ec93ab48
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/AssertionUtilsTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import org.junit.Test;
+
+import org.assertj.core.api.Assertions;
+
+public class AssertionUtilsTest
+{
+ @Test
+ public void isInstanceof()
+ {
+ Assertions.assertThat(new C())
+ .is(AssertionUtils.isInstanceof(A.class));
+
+ Assertions.assertThat(new D())
+ .is(AssertionUtils.isInstanceof(A.class))
+ .is(AssertionUtils.isInstanceof(B.class));
+
+ Assertions.assertThat(null instanceof A)
+
.isEqualTo(AssertionUtils.isInstanceof(A.class).matches(null));
+ }
+
+ interface A {}
+ interface B extends A {}
+ static class C implements A {}
+ static class D implements B {}
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]