This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/cassandra-diff.git
The following commit(s) were added to refs/heads/master by this push: new 0e5f319 Distinguish partition error source cluster 0e5f319 is described below commit 0e5f31929690c6b843d1f64d422daf3e476e35bf Author: Yifan Cai <yifan_...@apple.com> AuthorDate: Mon Aug 24 11:48:40 2020 -0700 Distinguish partition error source cluster - Wraps client error with ClusterSourcedException to distinguish the error source. - Stores `error_source text` in the error details table in metadata. --- .../cassandra/diff/ClusterSourcedException.java | 31 ++++++++++++++++++++ .../java/org/apache/cassandra/diff/Differ.java | 15 ++++++++-- .../org/apache/cassandra/diff/JobMetadataDb.java | 22 +++++++++++---- .../apache/cassandra/diff/PartitionComparator.java | 23 ++++++++++++--- .../org/apache/cassandra/diff/RangeComparator.java | 16 +++++++---- .../diff/ClusterSourcedExceptionTest.java | 33 ++++++++++++++++++++++ 6 files changed, 122 insertions(+), 18 deletions(-) diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/ClusterSourcedException.java b/spark-job/src/main/java/org/apache/cassandra/diff/ClusterSourcedException.java new file mode 100644 index 0000000..5412c65 --- /dev/null +++ b/spark-job/src/main/java/org/apache/cassandra/diff/ClusterSourcedException.java @@ -0,0 +1,31 @@ +package org.apache.cassandra.diff; + +import java.util.concurrent.Callable; + +import org.apache.cassandra.diff.DiffCluster.Type; + +/** + * Wraps the cause with the exception source indicator, {@param type} of the cluster. + * It is used to distinguish driver exceptions among testing clusters. + */ +public class ClusterSourcedException extends RuntimeException { + public final Type exceptionSource; + + ClusterSourcedException(Type exceptionSource, Throwable cause) { + super(cause); + this.exceptionSource = exceptionSource; + } + + public static <T> T catches(Type exceptionSource, Callable<T> callable) { + try { + return callable.call(); + } catch (Exception ex) { + throw new ClusterSourcedException(exceptionSource, ex); + } + } + + @Override + public String getMessage() { + return String.format("from %s - %s", exceptionSource.name(), super.getMessage()); + } +} diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java index f7545e3..1ec78f0 100644 --- a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java +++ b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java @@ -28,6 +28,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.stream.Collectors; @@ -41,6 +42,7 @@ import org.slf4j.LoggerFactory; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; public class Differ implements Serializable @@ -208,9 +210,9 @@ public class Differ implements Serializable final Function<PartitionKey, PartitionComparator> partitionTaskProvider = (key) -> { boolean reverse = context.shouldReverse(); - return new PartitionComparator(context.table, - context.source.getPartition(context.table, key, reverse), - context.target.getPartition(context.table, key, reverse)); + Iterator<Row> source = fetchRows(context, key, reverse, DiffCluster.Type.SOURCE); + Iterator<Row> target = fetchRows(context, key, reverse, DiffCluster.Type.TARGET); + return new PartitionComparator(context.table, source, target); }; RangeComparator rangeComparator = new RangeComparator(context, @@ -224,6 +226,13 @@ public class Differ implements Serializable return tableStats; } + private Iterator<Row> fetchRows(DiffContext context, PartitionKey key, boolean shouldReverse, DiffCluster.Type type) { + Callable<Iterator<Row>> rows = () -> type == DiffCluster.Type.SOURCE + ? context.source.getPartition(context.table, key, shouldReverse) + : context.target.getPartition(context.table, key, shouldReverse); + return ClusterSourcedException.catches(type, rows); + } + @VisibleForTesting static Map<KeyspaceTablePair, DiffJob.TaskStatus> filterTables(Iterable<KeyspaceTablePair> keyspaceTables, DiffJob.Split split, diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java index d9d4035..a7247dd 100644 --- a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java +++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; +import com.google.common.base.Throwables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -131,8 +132,9 @@ public class JobMetadataDb { " qualified_table_name," + " start_token," + " end_token," + - " error_token)" + - " VALUES (?, ?, ?, ?, ?, ?)", + " error_token," + + " error_source)" + + " VALUES (?, ?, ?, ?, ?, ?, ?)", metadataKeyspace, Schema.ERROR_DETAIL)); } @@ -218,7 +220,15 @@ public class JobMetadataDb { "error for partition with token %s", table, token), error); BatchStatement batch = new BatchStatement(); batch.add(bindErrorSummaryStatement(table)); - batch.add(bindErrorDetailStatement(table, token)); + DiffCluster.Type exceptionSource = null; + int maxRetrace = 10; // In case there is a loop, we do not want to loop forever or throw. So just limit the number of retracing. + for (Throwable t = error; t.getCause() != null && maxRetrace > 0; t = t.getCause(), maxRetrace--) { + if (t instanceof ClusterSourcedException) { + exceptionSource = ((ClusterSourcedException) t).exceptionSource; + break; + } + } + batch.add(bindErrorDetailStatement(table, token, exceptionSource)); batch.setIdempotent(true); session.execute(batch); } @@ -247,8 +257,9 @@ public class JobMetadataDb { .setIdempotent(true); } - private Statement bindErrorDetailStatement(KeyspaceTablePair table, BigInteger errorToken) { - return errorDetailStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken, errorToken.toString()) + private Statement bindErrorDetailStatement(KeyspaceTablePair table, BigInteger errorToken, DiffCluster.Type exceptionSource) { + String errorSource = exceptionSource == null ? "" : exceptionSource.name(); + return errorDetailStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken, errorToken.toString(), errorSource) .setIdempotent(true); } @@ -526,6 +537,7 @@ public class JobMetadataDb { " start_token varchar," + " end_token varchar," + " error_token varchar," + + " error_source varchar," + " PRIMARY KEY ((job_id, bucket, qualified_table_name, start_token, end_token), error_token))" + " WITH default_time_to_live = %s"; diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java index 4214f2b..8aefb49 100644 --- a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java +++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java @@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory; import com.datastax.driver.core.ColumnMetadata; import com.datastax.driver.core.Row; +import org.apache.cassandra.diff.DiffCluster.Type; public class PartitionComparator implements Callable<PartitionStats> { @@ -53,10 +54,10 @@ public class PartitionComparator implements Callable<PartitionStats> { return partitionStats; } - while (source.hasNext() && target.hasNext()) { + while (hasNextRow(Type.SOURCE) && hasNextRow(Type.TARGET)) { - Row sourceRow = source.next(); - Row targetRow = target.next(); + Row sourceRow = getNextRow(Type.SOURCE); + Row targetRow = getNextRow(Type.TARGET); // if primary keys don't match don't proceed any further, just mark the // partition as mismatched and be done @@ -73,12 +74,26 @@ public class PartitionComparator implements Callable<PartitionStats> { } // if one of the iterators isn't exhausted, then there's a mismatch at the partition level - if (source.hasNext() || target.hasNext()) + if (hasNextRow(Type.SOURCE) || hasNextRow(Type.TARGET)) partitionStats.allClusteringsMatch = false; return partitionStats; } + private boolean hasNextRow(Type type) { + Callable<Boolean> hasNext = () -> type == Type.SOURCE + ? source.hasNext() + : target.hasNext(); + return ClusterSourcedException.catches(type, hasNext); + } + + private Row getNextRow(Type type) { + Callable<Row> next = () -> type == Type.SOURCE + ? source.next() + : target.next(); + return ClusterSourcedException.catches(type, next); + } + private boolean clusteringsEqual(Row source, Row target) { for (ColumnMetadata column : tableSpec.getClusteringColumns()) { Object fromSource = source.getObject(column.getName()); diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java index bb5e937..5d6710e 100644 --- a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java +++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java @@ -125,8 +125,7 @@ public class RangeComparator { // unavailables occurring when performing the initial query to read the full partition. // Errors thrown when paging through the partition in comparisonTask will be handled by // the onError callback. - rangeStats.partitionError(); - errorReporter.accept(t, token); + recordError(rangeStats, token, errorReporter, t); } finally { // if the cluster has been shutdown because the task failed the underlying iterators // of partition keys will return hasNext == false @@ -224,10 +223,15 @@ public class RangeComparator { private Consumer<Throwable> onError(final RangeStats rangeStats, final BigInteger currentToken, final BiConsumer<Throwable, BigInteger> errorReporter) { - return (error) -> { - rangeStats.partitionError(); - errorReporter.accept(error, currentToken); - }; + return (error) -> recordError(rangeStats, currentToken, errorReporter, error); + } + + private void recordError(final RangeStats rangeStats, + final BigInteger currentToken, + final BiConsumer<Throwable, BigInteger> errorReporter, + final Throwable error) { + rangeStats.partitionError(); + errorReporter.accept(error, currentToken); } } diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/ClusterSourcedExceptionTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/ClusterSourcedExceptionTest.java new file mode 100644 index 0000000..e96c96d --- /dev/null +++ b/spark-job/src/test/java/org/apache/cassandra/diff/ClusterSourcedExceptionTest.java @@ -0,0 +1,33 @@ +package org.apache.cassandra.diff; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import org.hamcrest.CoreMatchers; +import org.hamcrest.CustomMatcher; + +public class ClusterSourcedExceptionTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testCatchesExceptionHasExceptionSourceInfo() { + expectedException.expect(ClusterSourcedException.class); + expectedException.expectCause(CoreMatchers.isA(RuntimeException.class)); + expectedException.expectMessage("from SOURCE"); + expectedException.expect(new CustomMatcher<ClusterSourcedException>("matches the expected exceptionSource: SOURCE") { + @Override + public boolean matches(Object item) { + if (item instanceof ClusterSourcedException) { + ClusterSourcedException ex = (ClusterSourcedException) item; + return ex.exceptionSource == DiffCluster.Type.SOURCE; + } + return false; + } + }); + ClusterSourcedException.catches(DiffCluster.Type.SOURCE, () -> { + throw new RuntimeException(); + }); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org