This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/cassandra-diff.git
The following commit(s) were added to refs/heads/master by this push:
new e9782c6 Support running diff on multiple keyspaces
e9782c6 is described below
commit e9782c6b59a0888e4c63248ef95468e6b176406d
Author: Yifan Cai <[email protected]>
AuthorDate: Fri May 15 13:57:48 2020 -0700
Support running diff on multiple keyspaces
Patch by Yifan Cai; reviewed by marcuse for CASSANDRA-15807
closes #8
---
README.md | 17 ++-
.../cassandra/diff/api/services/DBService.java | 61 ++++----
common/pom.xml | 7 +
.../apache/cassandra/diff/JobConfiguration.java | 4 +-
.../apache/cassandra/diff/KeyspaceTablePair.java | 61 ++++++++
.../cassandra/diff/YamlJobConfiguration.java | 14 +-
.../cassandra/diff/YamlJobConfigurationTest.java | 16 ++
.../src/test/resources/testconfig.yaml | 10 +-
...onfig.yaml => localconfig-multi-keyspaces.yaml} | 9 +-
spark-job/localconfig.yaml | 8 +-
.../apache/cassandra/diff/ComparisonExecutor.java | 12 +-
.../org/apache/cassandra/diff/DiffCluster.java | 95 +++++++-----
.../java/org/apache/cassandra/diff/DiffJob.java | 57 ++++----
.../java/org/apache/cassandra/diff/Differ.java | 98 ++++++-------
.../org/apache/cassandra/diff/JobMetadataDb.java | 161 +++++++++++----------
.../apache/cassandra/diff/PartitionComparator.java | 5 +-
.../org/apache/cassandra/diff/PartitionKey.java | 5 +-
.../org/apache/cassandra/diff/RangeComparator.java | 8 +-
.../java/org/apache/cassandra/diff/RangeStats.java | 5 +-
.../java/org/apache/cassandra/diff/TableSpec.java | 37 +++--
.../org/apache/cassandra/diff/DiffJobTest.java | 3 -
.../java/org/apache/cassandra/diff/DifferTest.java | 39 +++--
.../cassandra/diff/PartitionComparatorTest.java | 5 +-
23 files changed, 427 insertions(+), 310 deletions(-)
diff --git a/README.md b/README.md
index 5fab877..c17ef59 100644
--- a/README.md
+++ b/README.md
@@ -41,9 +41,15 @@ $ cd cassandra-diff
$ mvn package
$ docker run --name cas-src -d -p 9042:9042 cassandra:3.0.18
$ docker run --name cas-tgt -d -p 9043:9042 cassandra:latest
-$ docker exec cas-src cassandra-stress write n=1k
-$ docker exec cas-tgt cassandra-stress write n=1k
+# Create 1000 rows in table "standard1" under keyspace "keyspace1"
+$ docker exec cas-src cassandra-stress write n=1k -schema keyspace="keyspace1"
+$ docker exec cas-tgt cassandra-stress write n=1k -schema keyspace="keyspace1"
+# Optionally, create another 1000 rows in table "standard1" under keyspace
"keyspace2"
+$ docker exec cas-src cassandra-stress write n=1k -schema keyspace="keyspace2"
+$ docker exec cas-tgt cassandra-stress write n=1k -schema keyspace="keyspace2"
$ spark-submit --verbose --files ./spark-job/localconfig.yaml --class
org.apache.cassandra.diff.DiffJob
spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar localconfig.yaml
+# If rows are created in "keyspace2", you can run pick up the
localconfig-multi-keyspaces.yaml to compare data across multiple keyspaces! See
the command below.
+# $ spark-submit --verbose --files
./spark-job/localconfig-multi-keyspaces.yaml --class
org.apache.cassandra.diff.DiffJob
spark-uberjar/target/spark-uberjar-0.2-SNAPSHOT.jar
localconfig-multi-keyspaces.yaml
# ... logs
INFO DiffJob:124 - FINISHED: {standard1=Matched Partitions - 1000, Mismatched
Partitions - 0, Partition Errors - 0, Partitions Only In Source - 0, Partitions
Only In Target - 0, Skipped Partitions - 0, Matched Rows - 1000, Matched Values
- 6000, Mismatched Values - 0 }
## start api-server:
@@ -55,9 +61,8 @@ $ curl -s localhost:8089/jobs/recent | python -mjson.tool
{
"jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
"buckets": 100,
- "keyspace": "keyspace1",
- "tables": [
- "standard1"
+ "keyspace_tables": [
+ "keyspace1.standard1"
],
"sourceClusterName": "local_test_1",
"sourceClusterDesc": "ContactPoints Cluster: name=name,
dc=datacenter1, contact points= [127.0.0.1]",
@@ -71,7 +76,7 @@ $ curl -s
localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/results | pyt
[
{
"jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
- "table": "standard1",
+ "table": "keyspace1.standard1",
"matchedPartitions": 1000,
"mismatchedPartitions": 0,
"matchedRows": 1000,
diff --git
a/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
b/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
index 1bf1f88..0a9707a 100644
---
a/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
+++
b/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
@@ -80,8 +80,7 @@ public class DBService implements Closeable {
" job_id," +
" job_start_time," +
" buckets," +
- " keyspace_name, " +
- " table_names, " +
+ " qualified_table_names, " +
" source_cluster_name," +
" source_cluster_desc," +
" target_cluster_name," +
@@ -92,7 +91,7 @@ public class DBService implements Closeable {
jobResultStatement = session.prepare(String.format(
" SELECT " +
" job_id," +
- " table_name, " +
+ " qualified_table_name, " +
" matched_partitions," +
" mismatched_partitions," +
" matched_rows," +
@@ -102,12 +101,12 @@ public class DBService implements Closeable {
" partitions_only_in_target," +
" skipped_partitions" +
" FROM %s.job_results" +
- " WHERE job_id = ? AND table_name = ?", diffKeyspace));
+ " WHERE job_id = ? AND qualified_table_name = ?", diffKeyspace));
jobStatusStatement = session.prepare(String.format(
" SELECT " +
" job_id," +
" bucket," +
- " table_name," +
+ " qualified_table_name," +
" completed " +
" FROM %s.job_status" +
" WHERE job_id = ? AND bucket = ?", diffKeyspace));
@@ -115,7 +114,7 @@ public class DBService implements Closeable {
" SELECT " +
" job_id," +
" bucket," +
- " table_name," +
+ " qualified_table_name," +
" mismatching_token," +
" mismatch_type" +
" FROM %s.mismatches" +
@@ -123,14 +122,14 @@ public class DBService implements Closeable {
jobErrorSummaryStatement = session.prepare(String.format(
" SELECT " +
" count(start_token) AS error_count," +
- " table_name" +
+ " qualified_table_name" +
" FROM %s.task_errors" +
" WHERE job_id = ? AND bucket = ?",
diffKeyspace));
jobErrorRangesStatement = session.prepare(String.format(
" SELECT " +
" bucket," +
- " table_name," +
+ " qualified_table_name," +
" start_token," +
" end_token" +
" FROM %s.task_errors" +
@@ -138,10 +137,10 @@ public class DBService implements Closeable {
diffKeyspace));
jobErrorDetailStatement = session.prepare(String.format(
" SELECT " +
- " table_name," +
+ " qualified_table_name," +
" error_token" +
" FROM %s.partition_errors" +
- " WHERE job_id = ? AND bucket = ? AND table_name = ? AND
start_token = ? AND end_token = ?", diffKeyspace));
+ " WHERE job_id = ? AND bucket = ? AND qualified_table_name = ? AND
start_token = ? AND end_token = ?", diffKeyspace));
jobsStartDateStatement = session.prepare(String.format(
" SELECT " +
" job_id" +
@@ -190,8 +189,8 @@ public class DBService implements Closeable {
public Collection<JobResult> fetchJobResults(UUID jobId) {
JobSummary summary = fetchJobSummary(jobId);
- List<ResultSetFuture> futures =
Lists.newArrayListWithCapacity(summary.tables.size());
- for (String table : summary.tables)
+ List<ResultSetFuture> futures =
Lists.newArrayListWithCapacity(summary.keyspaceTables.size());
+ for (String table : summary.keyspaceTables)
futures.add(session.executeAsync(jobResultStatement.bind(jobId,
table)));
SortedSet<JobResult> results = Sets.newTreeSet();
@@ -206,8 +205,8 @@ public class DBService implements Closeable {
for (int i = 0; i < summary.buckets; i++)
futures.add(session.executeAsync(jobStatusStatement.bind(jobId,
i)));
- Map<String, Long> completedByTable =
Maps.newHashMapWithExpectedSize(summary.tables.size());
- processFutures(futures, row ->
completedByTable.merge(row.getString("table_name"),
+ Map<String, Long> completedByTable =
Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
+ processFutures(futures, row ->
completedByTable.merge(row.getString("qualified_table_name"),
row.getLong("completed"),
Long::sum));
return new JobStatus(jobId, completedByTable);
@@ -220,8 +219,8 @@ public class DBService implements Closeable {
for (int i = 0; i < summary.buckets; i++)
futures.add(session.executeAsync(jobMismatchesStatement.bind(jobId, i)));
- Map<String, List<Mismatch>> mismatchesByTable =
Maps.newHashMapWithExpectedSize(summary.tables.size());
- processFutures(futures, row ->
mismatchesByTable.merge(row.getString("table_name"),
+ Map<String, List<Mismatch>> mismatchesByTable =
Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
+ processFutures(futures, row ->
mismatchesByTable.merge(row.getString("qualified_table_name"),
Lists.newArrayList(new Mismatch(row.getString("mismatching_token"),
row.getString("mismatch_type"))),
(l1, l2) -> {
l1.addAll(l2); return l1;}));
@@ -235,11 +234,11 @@ public class DBService implements Closeable {
for (int i = 0; i < summary.buckets; i++)
futures.add(session.executeAsync(jobErrorSummaryStatement.bind(jobId, i)));
- Map<String, Long> errorCountByTable =
Maps.newHashMapWithExpectedSize(summary.tables.size());
+ Map<String, Long> errorCountByTable =
Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
processFutures(futures, row -> {
- String table = row.getString("table_name");
+ String table = row.getString("qualified_table_name");
if (null != table) {
- errorCountByTable.merge(row.getString("table_name"),
+ errorCountByTable.merge(row.getString("qualified_table_name"),
row.getLong("error_count"),
Long::sum);
}
@@ -254,8 +253,8 @@ public class DBService implements Closeable {
for (int i = 0; i < summary.buckets; i++)
futures.add(session.executeAsync(jobErrorRangesStatement.bind(jobId, i)));
- Map<String, List<Range>> errorRangesByTable =
Maps.newHashMapWithExpectedSize(summary.tables.size());
- processFutures(futures, row ->
errorRangesByTable.merge(row.getString("table_name"),
+ Map<String, List<Range>> errorRangesByTable =
Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
+ processFutures(futures, row ->
errorRangesByTable.merge(row.getString("qualified_table_name"),
Lists.newArrayList(new Range(row.getString("start_token"),
row.getString("end_token"))),
(l1, l2) -> {
l1.addAll(l2); return l1;}));
@@ -273,13 +272,13 @@ public class DBService implements Closeable {
processFutures(rangeFutures,
row ->
session.executeAsync(jobErrorDetailStatement.bind(jobId,
row.getInt("bucket"),
-
row.getString("table_name"),
+
row.getString("qualified_table_name"),
row.getString("start_token"),
row.getString("end_token"))),
errorFutures::add);
- Map<String, List<String>> errorsByTable =
Maps.newHashMapWithExpectedSize(summary.tables.size());
+ Map<String, List<String>> errorsByTable =
Maps.newHashMapWithExpectedSize(summary.keyspaceTables.size());
processFutures(errorFutures,
- row -> errorsByTable.merge(row.getString("table_name"),
+ row ->
errorsByTable.merge(row.getString("qualified_table_name"),
Lists.newArrayList(row.getString("error_token")),
(l1, l2) -> { l1.addAll(l2);
return l1;}));
return new JobErrorDetail(jobId, errorsByTable);
@@ -472,7 +471,7 @@ public class DBService implements Closeable {
static JobResult fromRow(Row row) {
return new JobResult(row.getUUID("job_id"),
- row.getString("table_name"),
+ row.getString("qualified_table_name"),
row.getLong("matched_partitions"),
row.getLong("mismatched_partitions"),
row.getLong("matched_rows"),
@@ -494,8 +493,7 @@ public class DBService implements Closeable {
final UUID jobId;
final int buckets;
- final String keyspace;
- final List<String> tables;
+ final List<String> keyspaceTables;
final String sourceClusterName;
final String sourceClusterDesc;
final String targetClusterName;
@@ -509,8 +507,7 @@ public class DBService implements Closeable {
private JobSummary(UUID jobId,
DateTime startTime,
int buckets,
- String keyspace,
- List<String> tables,
+ List<String> keyspaceTables,
String sourceClusterName,
String sourceClusterDesc,
String targetClusterName,
@@ -521,8 +518,7 @@ public class DBService implements Closeable {
this.startTime = startTime;
this.start = startTime.toString();
this.buckets = buckets;
- this.keyspace = keyspace;
- this.tables = tables;
+ this.keyspaceTables = keyspaceTables;
this.sourceClusterName = sourceClusterName;
this.sourceClusterDesc = sourceClusterDesc;
this.targetClusterName = targetClusterName;
@@ -534,8 +530,7 @@ public class DBService implements Closeable {
return new JobSummary(row.getUUID("job_id"),
new
DateTime(UUIDs.unixTimestamp(row.getUUID("job_start_time")), DateTimeZone.UTC),
row.getInt("buckets"),
- row.getString("keyspace_name"),
- row.getList("table_names", String.class),
+ row.getList("qualified_table_names",
String.class),
row.getString("source_cluster_name"),
row.getString("source_cluster_desc"),
row.getString("target_cluster_name"),
diff --git a/common/pom.xml b/common/pom.xml
index a25cd10..d877ff7 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -49,6 +49,13 @@
<artifactId>cassandra-driver-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
index f0cbb36..2d6cb51 100644
--- a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
+++ b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
@@ -26,9 +26,7 @@ import java.util.Optional;
import java.util.UUID;
public interface JobConfiguration extends Serializable {
- String keyspace();
-
- List<String> tables();
+ List<KeyspaceTablePair> keyspaceTables();
int splits();
diff --git
a/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java
b/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java
new file mode 100644
index 0000000..e705d3c
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/KeyspaceTablePair.java
@@ -0,0 +1,61 @@
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+import com.datastax.driver.core.TableMetadata;
+
+public final class KeyspaceTablePair implements Serializable {
+ public final String keyspace;
+ public final String table;
+
+ public static KeyspaceTablePair from(TableMetadata tableMetadata) {
+ return new KeyspaceTablePair(tableMetadata.getKeyspace().getName(),
tableMetadata.getName());
+ }
+
+ // Used by Yaml loader
+ public KeyspaceTablePair(String input) {
+ String[] parts = input.trim().split("\\.");
+ assert parts.length == 2 : "Invalid keyspace table pair format";
+ assert parts[0].length() > 0;
+ assert parts[1].length() > 0;
+
+ this.keyspace = parts[0];
+ this.table = parts[1];
+ }
+
+ public KeyspaceTablePair(String keyspace, String table) {
+ this.keyspace = keyspace;
+ this.table = table;
+ }
+
+ public String toCqlValueString() {
+ return String.format("%s.%s", keyspace, table);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("keyspace", keyspace)
+ .add("table", table)
+ .toString();
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(keyspace, table);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+ if (obj == null || getClass() != obj.getClass())
+ return false;
+ KeyspaceTablePair that = (KeyspaceTablePair) obj;
+ return Objects.equals(keyspace, that.keyspace)
+ && Objects.equals(table, that.table);
+ }
+}
diff --git
a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
index 69fc28c..3666e48 100644
--- a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
+++ b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
@@ -34,8 +34,7 @@ import
org.yaml.snakeyaml.constructor.CustomClassLoaderConstructor;
public class YamlJobConfiguration implements JobConfiguration {
public int splits = 10000;
- public String keyspace;
- public List<String> tables;
+ public List<KeyspaceTablePair> keyspace_tables;
public int buckets = 100;
public int rate_limit = 10000;
public String job_id = null;
@@ -59,12 +58,8 @@ public class YamlJobConfiguration implements
JobConfiguration {
}
}
- public String keyspace() {
- return keyspace;
- }
-
- public List<String> tables() {
- return tables;
+ public List<KeyspaceTablePair> keyspaceTables() {
+ return keyspace_tables;
}
public int splits() {
@@ -127,8 +122,7 @@ public class YamlJobConfiguration implements
JobConfiguration {
public String toString() {
return "YamlJobConfiguration{" +
"splits=" + splits +
- ", keyspace='" + keyspace + '\'' +
- ", tables=" + tables +
+ ", keyspace_tables=" + keyspace_tables +
", buckets=" + buckets +
", rate_limit=" + rate_limit +
", job_id='" + job_id + '\'' +
diff --git
a/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java
b/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java
new file mode 100644
index 0000000..813f38a
--- /dev/null
+++
b/common/src/test/java/org/apache/cassandra/diff/YamlJobConfigurationTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.diff;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class YamlJobConfigurationTest {
+ @Test
+ public void testLoadYaml() {
+ JobConfiguration jobConfiguration =
YamlJobConfiguration.load("src/test/resources/testconfig.yaml");
+ Assert.assertEquals(3, jobConfiguration.keyspaceTables().size());
+ jobConfiguration.keyspaceTables().forEach(kt -> {
+ Assert.assertTrue("Keyspace segment is not loaded correctly",
kt.keyspace.contains("ks"));
+ Assert.assertTrue("Table segment is not loaded correctly",
kt.table.contains("tb"));
+ });
+ }
+}
diff --git a/spark-job/localconfig.yaml
b/common/src/test/resources/testconfig.yaml
similarity index 94%
copy from spark-job/localconfig.yaml
copy to common/src/test/resources/testconfig.yaml
index d4f5630..a860e48 100644
--- a/spark-job/localconfig.yaml
+++ b/common/src/test/resources/testconfig.yaml
@@ -1,8 +1,8 @@
-# Keyspace to diff
-keyspace: keyspace1
-# List of tables to diff
-tables:
- - standard1
+# List of keyspace.tables to diff
+keyspace_tables:
+ - ks1.tb1
+ - ks1.tb2
+ - ks2.tb3
# This is how many parts we split the full token range in.
# Each of these splits is then compared between the clusters
diff --git a/spark-job/localconfig.yaml
b/spark-job/localconfig-multi-keyspaces.yaml
similarity index 93%
copy from spark-job/localconfig.yaml
copy to spark-job/localconfig-multi-keyspaces.yaml
index d4f5630..5b18627 100644
--- a/spark-job/localconfig.yaml
+++ b/spark-job/localconfig-multi-keyspaces.yaml
@@ -1,8 +1,7 @@
-# Keyspace to diff
-keyspace: keyspace1
-# List of tables to diff
-tables:
- - standard1
+# List of keyspace.tables to diff
+keyspace_tables:
+ - keyspace1.standard1
+ - keyspace2.standard1
# This is how many parts we split the full token range in.
# Each of these splits is then compared between the clusters
diff --git a/spark-job/localconfig.yaml b/spark-job/localconfig.yaml
index d4f5630..8271ebd 100644
--- a/spark-job/localconfig.yaml
+++ b/spark-job/localconfig.yaml
@@ -1,8 +1,6 @@
-# Keyspace to diff
-keyspace: keyspace1
-# List of tables to diff
-tables:
- - standard1
+# List of keyspace.tables to diff
+keyspace_tables:
+ - keyspace1.standard1
# This is how many parts we split the full token range in.
# Each of these splits is then compared between the clusters
diff --git
a/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
b/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
index bdd6488..b86cf69 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
@@ -19,11 +19,19 @@
package org.apache.cassandra.diff;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.Semaphore;
import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.util.concurrent.*;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
b/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
index 4bbd81c..a60b8f7 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
@@ -20,22 +20,48 @@
package org.apache.cassandra.diff;
import java.math.BigInteger;
-import java.util.*;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.AbstractIterator;
-import com.google.common.util.concurrent.*;
-
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datastax.driver.core.*;
-import com.datastax.driver.core.querybuilder.*;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ClusteringOrder;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.datastax.driver.core.querybuilder.BuiltStatement;
+import com.datastax.driver.core.querybuilder.Ordering;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
import org.jetbrains.annotations.NotNull;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.asc;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.desc;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.gt;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.lte;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.token;
import static org.apache.cassandra.diff.DiffContext.cqlizedString;
-import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
public class DiffCluster implements AutoCloseable
{
@@ -43,12 +69,11 @@ public class DiffCluster implements AutoCloseable
public enum Type {SOURCE, TARGET}
- private final Map<String, PreparedStatement[]> preparedStatements = new
HashMap<>();
+ private final Map<KeyspaceTablePair, PreparedStatement[]>
preparedStatements = new HashMap<>();
private final ConsistencyLevel consistencyLevel;
public final Cluster cluster;
private final Session session;
private final TokenHelper tokenHelper;
- public final String keyspace;
public final List<BigInteger> tokenList;
public final RateLimiter getPartitionRateLimiter;
@@ -61,7 +86,6 @@ public class DiffCluster implements AutoCloseable
public DiffCluster(Type clusterId,
Cluster cluster,
- String keyspace,
ConsistencyLevel consistencyLevel,
RateLimiter getPartitionRateLimiter,
int tokenScanFetchSize,
@@ -69,7 +93,6 @@ public class DiffCluster implements AutoCloseable
int readTimeoutMillis)
{
- this.keyspace = keyspace;
this.consistencyLevel = consistencyLevel;
this.cluster = cluster;
this.tokenHelper =
TokenHelper.forPartitioner(cluster.getMetadata().getPartitioner());
@@ -82,7 +105,7 @@ public class DiffCluster implements AutoCloseable
this.readTimeoutMillis = readTimeoutMillis;
}
- public Iterator<PartitionKey> getPartitionKeys(String table, final
BigInteger prevToken, final BigInteger token) {
+ public Iterator<PartitionKey> getPartitionKeys(KeyspaceTablePair table,
final BigInteger prevToken, final BigInteger token) {
try {
return
Uninterruptibles.getUninterruptibly(fetchPartitionKeys(table, prevToken,
token));
}
@@ -93,7 +116,7 @@ public class DiffCluster implements AutoCloseable
}
}
- private ListenableFuture<Iterator<PartitionKey>> fetchPartitionKeys(String
table, final BigInteger prevToken, final BigInteger token) {
+ private ListenableFuture<Iterator<PartitionKey>>
fetchPartitionKeys(KeyspaceTablePair table, final BigInteger prevToken, final
BigInteger token) {
BoundStatement statement =
keyReader(table).bind(tokenHelper.forBindParam(prevToken),
tokenHelper.forBindParam(token));
statement.setFetchSize(tokenScanFetchSize);
@@ -132,10 +155,10 @@ public class DiffCluster implements AutoCloseable
}
}
- private ResultSetFuture readPartition(String table, final PartitionKey
key, boolean shouldReverse) {
+ private ResultSetFuture readPartition(KeyspaceTablePair keyspaceTablePair,
final PartitionKey key, boolean shouldReverse) {
BoundStatement statement = shouldReverse
- ?
reverseReader(table).bind(key.getComponents().toArray())
- :
forwardReader(table).bind(key.getComponents().toArray());
+ ?
reverseReader(keyspaceTablePair).bind(key.getComponents().toArray())
+ :
forwardReader(keyspaceTablePair).bind(key.getComponents().toArray());
statement.setFetchSize(partitionReadFetchSize);
statement.setReadTimeoutMillis(readTimeoutMillis);
getPartitionRateLimiter.acquire();
@@ -153,38 +176,38 @@ public class DiffCluster implements AutoCloseable
cluster.closeAsync();
}
- private PreparedStatement keyReader(String table) {
- return getStatementForTable(table, 0);
+ private PreparedStatement keyReader(KeyspaceTablePair keyspaceTablePair) {
+ return getStatementForTable(keyspaceTablePair, 0);
}
- private PreparedStatement forwardReader(String table) {
- return getStatementForTable(table, 1);
+ private PreparedStatement forwardReader(KeyspaceTablePair
keyspaceTablePair) {
+ return getStatementForTable(keyspaceTablePair, 1);
}
- private PreparedStatement reverseReader(String table) {
- return getStatementForTable(table, 2);
+ private PreparedStatement reverseReader(KeyspaceTablePair
keyspaceTablePair) {
+ return getStatementForTable(keyspaceTablePair, 2);
}
- private PreparedStatement getStatementForTable(String table, int index) {
- if (!preparedStatements.containsKey(table)) {
+ private PreparedStatement getStatementForTable(KeyspaceTablePair
keyspaceTablePair, int index) {
+ if (!preparedStatements.containsKey(keyspaceTablePair)) {
synchronized (this) {
- if (!preparedStatements.containsKey(table)) {
- PreparedStatement keyStatement = getKeyStatement(table);
- PreparedStatement[] partitionReadStmts =
getFullStatement(table);
- preparedStatements.put(table, new PreparedStatement[]{
keyStatement ,
-
partitionReadStmts[0],
-
partitionReadStmts[1] });
+ if (!preparedStatements.containsKey(keyspaceTablePair)) {
+ PreparedStatement keyStatement =
getKeyStatement(keyspaceTablePair);
+ PreparedStatement[] partitionReadStmts =
getFullStatement(keyspaceTablePair);
+ preparedStatements.put(keyspaceTablePair, new
PreparedStatement[]{ keyStatement ,
+
partitionReadStmts[0],
+
partitionReadStmts[1] });
}
}
}
- return preparedStatements.get(table)[index];
+ return preparedStatements.get(keyspaceTablePair)[index];
}
- private PreparedStatement getKeyStatement(@NotNull String table) {
+ private PreparedStatement getKeyStatement(@NotNull KeyspaceTablePair
keyspaceTablePair) {
final TableMetadata tableMetadata = session.getCluster()
.getMetadata()
-
.getKeyspace(cqlizedString(keyspace))
-
.getTable(cqlizedString(table));
+
.getKeyspace(cqlizedString(keyspaceTablePair.keyspace))
+
.getTable(cqlizedString(keyspaceTablePair.table));
String[] partitionKeyColumns =
columnNames(tableMetadata.getPartitionKey());
Select.Selection selection =
QueryBuilder.select().distinct().column(token(partitionKeyColumns));
@@ -199,11 +222,11 @@ public class DiffCluster implements AutoCloseable
return session.prepare(select).setConsistencyLevel(consistencyLevel);
}
- private PreparedStatement[] getFullStatement(@NotNull String table) {
+ private PreparedStatement[] getFullStatement(@NotNull KeyspaceTablePair
keyspaceTablePair) {
final TableMetadata tableMetadata = session.getCluster()
.getMetadata()
-
.getKeyspace(cqlizedString(keyspace))
-
.getTable(cqlizedString(table));
+
.getKeyspace(cqlizedString(keyspaceTablePair.keyspace))
+
.getTable(cqlizedString(keyspaceTablePair.table));
String[] partitionKeyColumns =
columnNames(tableMetadata.getPartitionKey());
String[] allColumns = columnNames(tableMetadata.getColumns());
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
index 3047c97..4a1faa1 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
@@ -21,14 +21,17 @@ package org.apache.cassandra.diff;
import java.io.Serializable;
import java.math.BigInteger;
-import java.util.*;
-import java.util.function.BiConsumer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,13 +70,13 @@ public class DiffJob {
// optional code block to run before a job starts
private Runnable preJobHook;
// optional code block to run after a job completes successfully;
otherwise, it is not executed.
- private Consumer<Map<String, RangeStats>> postJobHook;
+ private Consumer<Map<KeyspaceTablePair, RangeStats>> postJobHook;
public void addPreJobHook(Runnable preJobHook) {
this.preJobHook = preJobHook;
}
- public void addPostJobHook(Consumer<Map<String, RangeStats>> postJobHook) {
+ public void addPostJobHook(Consumer<Map<KeyspaceTablePair, RangeStats>>
postJobHook) {
this.postJobHook = postJobHook;
}
@@ -132,10 +135,9 @@ public class DiffJob {
targetProvider.getClusterName(),
targetProvider.toString());
- logger.info("DiffJob {} comparing [{}] in keyspace {} on {} and
{}",
+ logger.info("DiffJob {} comparing [{}] on {} and {}",
jobId,
- String.join(",", params.tables),
- params.keyspace,
+
params.keyspaceTables.stream().map(KeyspaceTablePair::toString).collect(Collectors.joining(",")),
sourceProvider,
targetProvider);
@@ -143,18 +145,18 @@ public class DiffJob {
preJobHook.run();
// Run the distributed diff and collate results
- Map<String, RangeStats> diffStats = sc.parallelize(splits, slices)
- .map((split) -> new
Differ(configuration,
-
params,
-
perExecutorRateLimit,
-
split,
-
tokenHelper,
-
sourceProvider,
-
targetProvider,
-
metadataProvider,
-
new TrackerProvider(configuration.metadataOptions().keyspace))
- .run())
- .reduce(Differ::accumulate);
+ Map<KeyspaceTablePair, RangeStats> diffStats =
sc.parallelize(splits, slices)
+ .map((split) ->
new Differ(configuration,
+
params,
+
perExecutorRateLimit,
+
split,
+
tokenHelper,
+
sourceProvider,
+
targetProvider,
+
metadataProvider,
+
new TrackerProvider(configuration.metadataOptions().keyspace))
+
.run())
+
.reduce(Differ::accumulate);
// Publish results. This also removes the job from the currently
running list
job.finalizeJob(params.jobId, diffStats);
logger.info("FINISHED: {}", diffStats);
@@ -181,8 +183,7 @@ public class DiffJob {
return job.getJobParams(conf.jobId().get());
} else {
return new Params(UUID.randomUUID(),
- conf.keyspace(),
- conf.tables(),
+ conf.keyspaceTables(),
conf.buckets(),
conf.splits());
}
@@ -264,22 +265,20 @@ public class DiffJob {
static class Params implements Serializable {
public final UUID jobId;
- public final String keyspace;
- public final ImmutableList<String> tables;
+ public final ImmutableList<KeyspaceTablePair> keyspaceTables;
public final int buckets;
public final int tasks;
- Params(UUID jobId, String keyspace, List<String> tables, int buckets,
int tasks) {
+ Params(UUID jobId, List<KeyspaceTablePair> keyspaceTables, int
buckets, int tasks) {
this.jobId = jobId;
- this.keyspace = keyspace;
- this.tables = ImmutableList.copyOf(tables);
+ this.keyspaceTables = ImmutableList.copyOf(keyspaceTables);
this.buckets = buckets;
this.tasks = tasks;
}
public String toString() {
- return String.format("Params: [jobId: %s, keyspace: %s, tables:
%s, buckets: %s, tasks: %s]",
- jobId, keyspace,
tables.stream().collect(Collectors.joining(",")), buckets, tasks);
+ return String.format("Params: [jobId: %s, keyspaceTables: %s,
buckets: %s, tasks: %s]",
+ jobId,
keyspaceTables.stream().map(KeyspaceTablePair::toString).collect(Collectors.joining(",")),
buckets, tasks);
}
}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
index 2272b44..f7545e3 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
@@ -23,23 +23,25 @@ import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
import java.math.BigInteger;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
-import java.util.function.BiConsumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Verify;
-import com.google.common.util.concurrent.*;
-
+import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.Session;
-
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
public class Differ implements Serializable
{
@@ -53,8 +55,7 @@ public class Differ implements Serializable
private final UUID jobId;
private final DiffJob.Split split;
private final TokenHelper tokenHelper;
- private final String keyspace;
- private final List<String> tables;
+ private final List<KeyspaceTablePair> keyspaceTables;
private final RateLimiter rateLimiter;
private final DiffJob.TrackerProvider trackerProvider;
private final double reverseReadProbability;
@@ -92,8 +93,7 @@ public class Differ implements Serializable
this.jobId = params.jobId;
this.split = split;
this.tokenHelper = tokenHelper;
- this.keyspace = params.keyspace;
- this.tables = params.tables;
+ this.keyspaceTables = params.keyspaceTables;
this.trackerProvider = trackerProvider;
rateLimiter = RateLimiter.create(perExecutorRateLimit);
this.reverseReadProbability = config.reverseReadProbability();
@@ -110,7 +110,6 @@ public class Differ implements Serializable
{
srcDiffCluster = new DiffCluster(DiffCluster.Type.SOURCE,
sourceProvider.getCluster(),
- params.keyspace,
cl,
rateLimiter,
config.tokenScanFetchSize(),
@@ -122,7 +121,6 @@ public class Differ implements Serializable
{
targetDiffCluster = new DiffCluster(DiffCluster.Type.TARGET,
targetProvider.getCluster(),
- params.keyspace,
cl,
rateLimiter,
config.tokenScanFetchSize(),
@@ -138,18 +136,19 @@ public class Differ implements Serializable
}
}
- public Map<String, RangeStats> run() {
+ public Map<KeyspaceTablePair, RangeStats> run() {
JobMetadataDb.ProgressTracker journal =
trackerProvider.getTracker(journalSession, jobId, split);
- Map<String, DiffJob.TaskStatus> tablesToDiff = filterTables(tables,
- split,
-
journal::getLastStatus,
-
!specificTokens.isEmpty());
- String metricsPrefix = String.format("%s.%s",
srcDiffCluster.clusterId.name(), srcDiffCluster.keyspace);
+ Map<KeyspaceTablePair, DiffJob.TaskStatus> tablesToDiff =
filterTables(keyspaceTables,
+
split,
+
journal::getLastStatus,
+
!specificTokens.isEmpty());
+
+ String metricsPrefix = srcDiffCluster.clusterId.name();
logger.info("Diffing {} for tables {}", split, tablesToDiff);
- for (Map.Entry<String, DiffJob.TaskStatus> tableStatus :
tablesToDiff.entrySet()) {
- final String table = tableStatus.getKey();
+ for (Map.Entry<KeyspaceTablePair, DiffJob.TaskStatus> tableStatus :
tablesToDiff.entrySet()) {
+ final KeyspaceTablePair keyspaceTablePair = tableStatus.getKey();
DiffJob.TaskStatus status = tableStatus.getValue();
RangeStats diffStats = status.stats;
@@ -160,37 +159,37 @@ public class Differ implements Serializable
BigInteger startToken = status.lastToken == null || isRerun ?
split.start : status.lastToken;
validateRange(startToken, split.end, tokenHelper);
- TableSpec sourceTable = TableSpec.make(table, srcDiffCluster);
- TableSpec targetTable = TableSpec.make(table, targetDiffCluster);
+ TableSpec sourceTable = TableSpec.make(keyspaceTablePair,
srcDiffCluster);
+ TableSpec targetTable = TableSpec.make(keyspaceTablePair,
targetDiffCluster);
validateTableSpecs(sourceTable, targetTable);
DiffContext ctx = new DiffContext(srcDiffCluster,
targetDiffCluster,
- keyspace,
+ keyspaceTablePair.keyspace,
sourceTable,
startToken,
split.end,
specificTokens,
reverseReadProbability);
- String timerName = String.format("%s.%s.split_times",
metricsPrefix, table);
+ String timerName = String.format("%s.%s.split_times",
metricsPrefix, keyspaceTablePair.table);
try (@SuppressWarnings("unused") Timer.Context timer =
metrics.timer(timerName).time()) {
diffStats.accumulate(diffTable(ctx,
- (error, token) ->
journal.recordError(table, token, error),
- (type, token) ->
journal.recordMismatch(table, type, token),
- (stats, token) ->
journal.updateStatus(table, stats, token)));
+ (error, token) ->
journal.recordError(keyspaceTablePair, token, error),
+ (type, token) ->
journal.recordMismatch(keyspaceTablePair, type, token),
+ (stats, token) ->
journal.updateStatus(keyspaceTablePair, stats, token)));
// update the journal with the final state for the table. Use
the split's ending token
// as the last seen token (even though we may not have
actually read any partition for
// that token) as this effectively marks the split as done.
- journal.finishTable(table, diffStats, !isRerun);
+ journal.finishTable(keyspaceTablePair, diffStats, !isRerun);
}
}
- Map<String, RangeStats> statsByTable = tablesToDiff.entrySet()
- .stream()
-
.collect(Collectors.toMap(Map.Entry::getKey,
-
e -> e.getValue().stats));
+ Map<KeyspaceTablePair, RangeStats> statsByTable =
tablesToDiff.entrySet()
+ .stream()
+
.collect(Collectors.toMap(Map.Entry::getKey,
+
e -> e.getValue().stats));
updateMetrics(metricsPrefix, statsByTable);
return statsByTable;
}
@@ -226,25 +225,25 @@ public class Differ implements Serializable
}
@VisibleForTesting
- static Map<String, DiffJob.TaskStatus> filterTables(Iterable<String>
tables,
- DiffJob.Split split,
- Function<String,
DiffJob.TaskStatus> journal,
- boolean
includeCompleted) {
- Map<String, DiffJob.TaskStatus> tablesToProcess = new HashMap<>();
- for (String table : tables) {
- DiffJob.TaskStatus taskStatus = journal.apply(table);
+ static Map<KeyspaceTablePair, DiffJob.TaskStatus>
filterTables(Iterable<KeyspaceTablePair> keyspaceTables,
+
DiffJob.Split split,
+
Function<KeyspaceTablePair, DiffJob.TaskStatus> journal,
+ boolean
includeCompleted) {
+ Map<KeyspaceTablePair, DiffJob.TaskStatus> tablesToProcess = new
HashMap<>();
+ for (KeyspaceTablePair pair : keyspaceTables) {
+ DiffJob.TaskStatus taskStatus = journal.apply(pair);
RangeStats diffStats = taskStatus.stats;
BigInteger lastToken = taskStatus.lastToken;
// When we finish processing a split for a given table, we update
the task status in journal
// to set the last seen token to the split's end token, to
indicate that the split is complete.
if (!includeCompleted && lastToken != null &&
lastToken.equals(split.end)) {
- logger.info("Found finished table {} for split {}", table,
split);
+ logger.info("Found finished table {} for split {}", pair,
split);
}
else {
- tablesToProcess.put(table, diffStats != null
- ? taskStatus
- : new
DiffJob.TaskStatus(taskStatus.lastToken, RangeStats.newStats()));
+ tablesToProcess.put(pair, diffStats != null
+ ? taskStatus
+ : new
DiffJob.TaskStatus(taskStatus.lastToken, RangeStats.newStats()));
}
}
return tablesToProcess;
@@ -267,9 +266,9 @@ public class Differ implements Serializable
}
@VisibleForTesting
- static Map<String, RangeStats> accumulate(Map<String, RangeStats> stats,
Map<String, RangeStats> otherStats)
+ static Map<KeyspaceTablePair, RangeStats>
accumulate(Map<KeyspaceTablePair, RangeStats> stats, Map<KeyspaceTablePair,
RangeStats> otherStats)
{
- for (Map.Entry<String, RangeStats> otherEntry : otherStats.entrySet())
+ for (Map.Entry<KeyspaceTablePair, RangeStats> otherEntry :
otherStats.entrySet())
{
if (stats.containsKey(otherEntry.getKey()))
stats.get(otherEntry.getKey()).accumulate(otherEntry.getValue());
@@ -279,11 +278,12 @@ public class Differ implements Serializable
return stats;
}
- private static void updateMetrics(String prefix, Map<String, RangeStats>
statsMap)
+ private static void updateMetrics(String prefix, Map<KeyspaceTablePair,
RangeStats> statsMap)
{
- for (Map.Entry<String, RangeStats> entry : statsMap.entrySet())
+ for (Map.Entry<KeyspaceTablePair, RangeStats> entry :
statsMap.entrySet())
{
- String qualifier = String.format("%s.%s", prefix, entry.getKey());
+ KeyspaceTablePair keyspaceTablePair = entry.getKey();
+ String qualifier = String.format("%s.%s.%s", prefix,
keyspaceTablePair.keyspace, keyspaceTablePair.table);
RangeStats stats = entry.getValue();
metrics.meter(qualifier +
".partitions_read").mark(stats.getMatchedPartitions() + stats.getOnlyInSource()
+ stats.getOnlyInTarget() + stats.getMismatchedPartitions());
diff --git
a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
index 1eb121c..d9d4035 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
@@ -20,13 +20,22 @@
package org.apache.cassandra.diff;
import java.math.BigInteger;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
import com.datastax.driver.core.utils.UUIDs;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -40,7 +49,7 @@ public class JobMetadataDb {
private final int bucket;
private final String startToken;
private final String endToken;
- private final String keyspace;
+ private final String metadataKeyspace;
private Session session;
private static PreparedStatement updateStmt;
@@ -53,25 +62,25 @@ public class JobMetadataDb {
int bucket,
BigInteger startToken,
BigInteger endToken,
- String keyspace,
+ String metadataKeyspace,
Session session) {
this.jobId = jobId;
this.bucket = bucket;
this.startToken = startToken.toString();
this.endToken = endToken.toString();
- this.keyspace = keyspace;
+ this.metadataKeyspace = metadataKeyspace;
this.session = session;
}
/**
* Runs on each executor to prepare statements shared across all
instances
*/
- public static void initializeStatements(Session session, String
keyspace) {
+ public static void initializeStatements(Session session, String
metadataKeyspace) {
if (updateStmt == null) {
updateStmt = session.prepare(String.format("INSERT INTO %s.%s
(" +
" job_id," +
" bucket," +
- " table_name," +
+ "
qualified_table_name," +
" start_token," +
" end_token," +
"
matched_partitions," +
@@ -84,47 +93,47 @@ public class JobMetadataDb {
"
skipped_partitions," +
" last_token )" +
"VALUES (?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
- keyspace,
Schema.TASK_STATUS));
+ metadataKeyspace,
Schema.TASK_STATUS));
}
if (mismatchStmt == null) {
mismatchStmt = session.prepare(String.format("INSERT INTO
%s.%s (" +
" job_id," +
" bucket," +
- " table_name," +
+ "
qualified_table_name," +
"
mismatching_token," +
" mismatch_type
)" +
"VALUES (?, ?, ?,
?, ?)",
- keyspace,
Schema.MISMATCHES));
+ metadataKeyspace,
Schema.MISMATCHES));
}
if (updateCompleteStmt == null) {
updateCompleteStmt = session.prepare(String.format("UPDATE
%s.%s " +
" SET
completed = completed + 1" +
" WHERE
job_id = ? " +
" AND
bucket = ? " +
- " AND
table_name = ? ",
- keyspace,
Schema.JOB_STATUS))
+ " AND
qualified_table_name = ? ",
+
metadataKeyspace, Schema.JOB_STATUS))
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
}
if (errorSummaryStmt == null) {
errorSummaryStmt = session.prepare(String.format("INSERT INTO
%s.%s (" +
" job_id," +
" bucket," +
- "
table_name," +
+ "
qualified_table_name," +
"
start_token," +
" end_token)"
+
" VALUES (?,
?, ?, ?, ?)",
- keyspace,
Schema.ERROR_SUMMARY));
+
metadataKeyspace, Schema.ERROR_SUMMARY));
}
if (errorDetailStmt == null) {
errorDetailStmt = session.prepare(String.format("INSERT INTO
%s.%s (" +
" job_id," +
" bucket," +
- " table_name,"
+
+ "
qualified_table_name," +
"
start_token," +
" end_token," +
"
error_token)" +
" VALUES (?,
?, ?, ?, ?, ?)",
- keyspace,
Schema.ERROR_DETAIL));
+
metadataKeyspace, Schema.ERROR_DETAIL));
}
}
@@ -140,10 +149,10 @@ public class JobMetadataDb {
/**
*
- * @param table
+ * @param keyspaceTablePair
* @return
*/
- public DiffJob.TaskStatus getLastStatus(String table) {
+ public DiffJob.TaskStatus getLastStatus(KeyspaceTablePair
keyspaceTablePair) {
ResultSet rs = session.execute(String.format("SELECT last_token, "
+
"
matched_partitions, " +
"
mismatched_partitions, " +
@@ -156,11 +165,11 @@ public class JobMetadataDb {
" FROM %s.%s " +
" WHERE job_id = ? " +
" AND bucket = ? " +
- " AND table_name =
? " +
+ " AND
qualified_table_name = ? " +
" AND start_token =
? " +
" AND end_token =
?",
- keyspace,
Schema.TASK_STATUS),
- jobId, bucket, table, startToken,
endToken);
+ metadataKeyspace,
Schema.TASK_STATUS),
+ jobId, bucket,
keyspaceTablePair.toCqlValueString(), startToken, endToken);
Row row = rs.one();
if (null == row)
return DiffJob.TaskStatus.EMPTY;
@@ -185,11 +194,11 @@ public class JobMetadataDb {
* @param diffStats
* @param latestToken
*/
- public void updateStatus(String table, RangeStats diffStats,
BigInteger latestToken) {
+ public void updateStatus(KeyspaceTablePair table, RangeStats
diffStats, BigInteger latestToken) {
session.execute(bindUpdateStatement(table, diffStats,
latestToken));
}
- public void recordMismatch(String table, MismatchType type, BigInteger
token) {
+ public void recordMismatch(KeyspaceTablePair table, MismatchType type,
BigInteger token) {
logger.info("Detected mismatch in table {}; partition with token
{} is {}",
table, token, type == MismatchType.PARTITION_MISMATCH
? " different in source and target
clusters"
@@ -204,7 +213,7 @@ public class JobMetadataDb {
* @param token
* @param error
*/
- public void recordError(String table, BigInteger token, Throwable
error) {
+ public void recordError(KeyspaceTablePair table, BigInteger token,
Throwable error) {
logger.error(String.format("Encountered error during partition
comparison in table %s; " +
"error for partition with token %s",
table, token), error);
BatchStatement batch = new BatchStatement();
@@ -219,41 +228,41 @@ public class JobMetadataDb {
* @param table
* @param stats
*/
- public void finishTable(String table, RangeStats stats, boolean
updateCompletedCount) {
+ public void finishTable(KeyspaceTablePair table, RangeStats stats,
boolean updateCompletedCount) {
logger.info("Finishing range [{}, {}] for table {}", startToken,
endToken, table);
// first flush out the last status.
session.execute(bindUpdateStatement(table, stats, endToken));
// then update the count of completed tasks
if (updateCompletedCount)
- session.execute(updateCompleteStmt.bind(jobId, bucket, table));
+ session.execute(updateCompleteStmt.bind(jobId, bucket,
table.toCqlValueString()));
}
- private Statement bindMismatchesStatement(String table, BigInteger
token, String type) {
- return mismatchStmt.bind(jobId, bucket, table, token.toString(),
type)
+ private Statement bindMismatchesStatement(KeyspaceTablePair table,
BigInteger token, String type) {
+ return mismatchStmt.bind(jobId, bucket, table.toCqlValueString(),
token.toString(), type)
.setIdempotent(true);
}
- private Statement bindErrorSummaryStatement(String table) {
- return errorSummaryStmt.bind(jobId, bucket, table, startToken,
endToken)
+ private Statement bindErrorSummaryStatement(KeyspaceTablePair table) {
+ return errorSummaryStmt.bind(jobId, bucket,
table.toCqlValueString(), startToken, endToken)
.setIdempotent(true);
}
- private Statement bindErrorDetailStatement(String table, BigInteger
errorToken) {
- return errorDetailStmt.bind(jobId, bucket, table, startToken,
endToken, errorToken.toString())
+ private Statement bindErrorDetailStatement(KeyspaceTablePair table,
BigInteger errorToken) {
+ return errorDetailStmt.bind(jobId, bucket,
table.toCqlValueString(), startToken, endToken, errorToken.toString())
.setIdempotent(true);
}
- private Statement bindUpdateStatement(String table, RangeStats stats,
BigInteger token) {
+ private Statement bindUpdateStatement(KeyspaceTablePair table,
RangeStats stats, BigInteger token) {
return bindUpdateStatement(table, stats, token.toString());
}
- private Statement bindUpdateStatement(String table, RangeStats stats,
String token) {
+ private Statement bindUpdateStatement(KeyspaceTablePair table,
RangeStats stats, String token) {
// We don't persist the partition error count from RangeStats as
errors
// are likely to be transient and not data related, so we don't
want to
// accumulate them across runs.
return updateStmt.bind(jobId,
bucket,
- table,
+ table.toCqlValueString(),
startToken,
endToken,
stats.getMatchedPartitions(),
@@ -275,29 +284,32 @@ public class JobMetadataDb {
static class JobLifeCycle {
final Session session;
- final String keyspace;
+ final String metadataKeyspace;
public JobLifeCycle(Session session, String metadataKeyspace) {
this.session = session;
- this.keyspace = metadataKeyspace;
+ this.metadataKeyspace = metadataKeyspace;
}
public DiffJob.Params getJobParams(UUID jobId) {
- ResultSet rs = session.execute(String.format("SELECT
keyspace_name, " +
- " table_names,"
+
+ ResultSet rs = session.execute(String.format("SELECT
qualified_table_names," +
" buckets," +
" total_tasks "
+
"FROM %s.%s " +
"WHERE job_id = ?",
- keyspace,
Schema.JOB_SUMMARY),
+ metadataKeyspace,
Schema.JOB_SUMMARY),
jobId);
Row row = rs.one();
if (null == row)
return null;
+ // qualified_table_names is encoded as a List<String>. Decode it
back to List<KeyspaceTablePair>.
+ List<KeyspaceTablePair> keyspaceTables =
row.getList("qualified_table_names", String.class)
+ .stream()
+
.map(KeyspaceTablePair::new)
+
.collect(Collectors.toList());;
return new DiffJob.Params(jobId,
- row.getString("keyspace_name"),
- row.getList("table_names", String.class),
+ keyspaceTables,
row.getInt("buckets"),
row.getInt("total_tasks"));
}
@@ -314,7 +326,7 @@ public class JobMetadataDb {
// The job was previously run, so this could be a re-run to
// mop up any failed splits so mark it in progress.
ResultSet rs = session.execute(String.format("INSERT INTO %s.%s
(job_id) VALUES (?) IF NOT EXISTS",
- keyspace,
Schema.RUNNING_JOBS),
+ metadataKeyspace,
Schema.RUNNING_JOBS),
params.jobId);
if (!rs.one().getBool("[applied]")) {
logger.info("Aborting due to inability to mark job as running.
" +
@@ -330,21 +342,19 @@ public class JobMetadataDb {
" job_id," +
" job_start_time," +
" buckets," +
- " keyspace_name," +
- " table_names," +
+ " qualified_table_names," +
" source_cluster_name," +
" source_cluster_desc," +
" target_cluster_name," +
" target_cluster_desc," +
" total_tasks)" +
- " VALUES (?, ?, ?, ?, ?, ?, ?,
?, ?, ?)" +
+ " VALUES (?, ?, ?, ?, ?, ?, ?,
?, ?)" +
" IF NOT EXISTS",
- keyspace, Schema.JOB_SUMMARY),
+ metadataKeyspace,
Schema.JOB_SUMMARY),
params.jobId,
timeUUID,
params.buckets,
- params.keyspace,
- params.tables,
+
params.keyspaceTables.stream().map(KeyspaceTablePair::toCqlValueString).collect(Collectors.toList()),
sourceClusterName,
sourceClusterDesc,
targetClusterName,
@@ -355,34 +365,34 @@ public class JobMetadataDb {
if (rs.one().getBool("[applied]")) {
BatchStatement batch = new BatchStatement();
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s
(source_cluster_name, job_id) VALUES (?, ?)",
- keyspace,
Schema.SOURCE_CLUSTER_INDEX),
+ metadataKeyspace,
Schema.SOURCE_CLUSTER_INDEX),
sourceClusterName,
params.jobId));
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s
(target_cluster_name, job_id) VALUES (?, ?)",
- keyspace,
Schema.TARGET_CLUSTER_INDEX),
+ metadataKeyspace,
Schema.TARGET_CLUSTER_INDEX),
targetClusterName,
params.jobId));
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s
(keyspace_name, job_id) VALUES (?, ?)",
- keyspace,
Schema.KEYSPACE_INDEX),
- keyspace, params.jobId));
+ metadataKeyspace,
Schema.KEYSPACE_INDEX),
+ metadataKeyspace, params.jobId));
batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s
(job_start_date, job_start_hour, job_start_time, job_id) " +
"VALUES ('%s', ?,
?, ?)",
- keyspace,
Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")),
+ metadataKeyspace,
Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")),
startDateTime.getHourOfDay(),
timeUUID, params.jobId));
session.execute(batch);
}
}
- public void finalizeJob(UUID jobId, Map<String, RangeStats> results) {
+ public void finalizeJob(UUID jobId, Map<KeyspaceTablePair, RangeStats>
results) {
logger.info("Finalizing job status");
markNotRunning(jobId);
BatchStatement batch = new BatchStatement();
- for (Map.Entry<String, RangeStats> result : results.entrySet()) {
- String table = result.getKey();
+ for (Map.Entry<KeyspaceTablePair, RangeStats> result :
results.entrySet()) {
+ KeyspaceTablePair table = result.getKey();
RangeStats stats = result.getValue();
session.execute(String.format("INSERT INTO %s.%s (" +
" job_id," +
- " table_name," +
+ " qualified_table_name," +
" matched_partitions," +
" mismatched_partitions," +
" partitions_only_in_source," +
@@ -392,9 +402,9 @@ public class JobMetadataDb {
" mismatched_values," +
" skipped_partitions) " +
"VALUES (?, ?, ?, ?, ?, ?, ?, ?,
?, ?)",
- keyspace, Schema.JOB_RESULTS),
+ metadataKeyspace,
Schema.JOB_RESULTS),
jobId,
- table,
+ table.toCqlValueString(),
stats.getMatchedPartitions(),
stats.getMismatchedPartitions(),
stats.getOnlyInSource(),
@@ -414,18 +424,18 @@ public class JobMetadataDb {
logger.info("Marking job {} as not running", jobId);
ResultSet rs = session.execute(String.format("DELETE FROM
%s.%s WHERE job_id = ? IF EXISTS",
- keyspace, Schema.RUNNING_JOBS),
+ metadataKeyspace,
Schema.RUNNING_JOBS),
jobId);
if (!rs.one().getBool("[applied]"))
{
logger.warn("Non-fatal: Unable to mark job %s as not
running, check logs for errors " +
- "during initialization as there may be no
entry for this job in the {} table",
+ "during initialization as there may be no
entry for this job {} in the {} table",
jobId, Schema.RUNNING_JOBS);
}
} catch (Exception e) {
// Because this is called from another exception handler, we
don't want to lose the original exception
// just because we may not have been able to mark the job as
not running. Just log here
- logger.error("Could not mark job {} as not running.", e);
+ logger.error("Could not mark job {} as not running.", jobId,
e);
}
}
}
@@ -436,7 +446,7 @@ public class JobMetadataDb {
private static final String TASK_STATUS_SCHEMA = "CREATE TABLE IF NOT
EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
- " table_name text," +
+ "
qualified_table_name text," +
" start_token
varchar," +
" end_token varchar,"
+
" matched_partitions
bigint," +
@@ -448,7 +458,7 @@ public class JobMetadataDb {
" mismatched_values
bigint," +
" skipped_partitions
bigint," +
" last_token
varchar," +
- " PRIMARY
KEY((job_id, bucket), table_name, start_token, end_token))" +
+ " PRIMARY
KEY((job_id, bucket), qualified_table_name, start_token, end_token))" +
" WITH
default_time_to_live = %s";
public static final String JOB_SUMMARY = "job_summary";
@@ -456,8 +466,7 @@ public class JobMetadataDb {
" job_id uuid," +
" job_start_time
timeuuid," +
" buckets int," +
- " keyspace_name
text," +
- " table_names
frozen<list<text>>," +
+ "
qualified_table_names frozen<list<text>>," +
" source_cluster_name
text," +
" source_cluster_desc
text," +
" target_cluster_name
text," +
@@ -469,7 +478,7 @@ public class JobMetadataDb {
public static final String JOB_RESULTS = "job_results";
private static final String JOB_RESULTS_SCHEMA = "CREATE TABLE IF NOT
EXISTS %s.%s (" +
" job_id uuid," +
- " table_name text," +
+ "
qualified_table_name text," +
" matched_partitions
bigint," +
"
mismatched_partitions bigint," +
"
partitions_only_in_source bigint," +
@@ -478,46 +487,46 @@ public class JobMetadataDb {
" matched_values
bigint," +
" mismatched_values
bigint," +
" skipped_partitions
bigint," +
- " PRIMARY KEY(job_id,
table_name))" +
+ " PRIMARY KEY(job_id,
qualified_table_name))" +
" WITH
default_time_to_live = %s";
public static final String JOB_STATUS = "job_status";
private static final String JOB_STATUS_SCHEMA = "CREATE TABLE IF NOT
EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
- " table_name text," +
+ " qualified_table_name
text," +
" completed counter," +
- " PRIMARY KEY
((job_id, bucket), table_name))";
+ " PRIMARY KEY
((job_id, bucket), qualified_table_name))";
public static final String MISMATCHES = "mismatches";
private static final String MISMATCHES_SCHEMA = "CREATE TABLE IF NOT
EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
- " table_name text, " +
+ " qualified_table_name
text, " +
" mismatching_token
varchar, " +
" mismatch_type text,
" +
- " PRIMARY KEY
((job_id, bucket), table_name, mismatching_token))" +
+ " PRIMARY KEY
((job_id, bucket), qualified_table_name, mismatching_token))" +
" WITH
default_time_to_live = %s";
public static final String ERROR_SUMMARY = "task_errors";
private static final String ERROR_SUMMARY_SCHEMA = "CREATE TABLE IF
NOT EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
- " table_name text,"
+
+ "
qualified_table_name text," +
" start_token
varchar," +
" end_token
varchar," +
- " PRIMARY KEY
((job_id, bucket), table_name, start_token, end_token))" +
+ " PRIMARY KEY
((job_id, bucket), qualified_table_name, start_token, end_token))" +
" WITH
default_time_to_live = %s";
public static final String ERROR_DETAIL = "partition_errors";
private static final String ERROR_DETAIL_SCHEMA = "CREATE TABLE IF NOT
EXISTS %s.%s (" +
" job_id uuid," +
" bucket int," +
- " table_name text," +
+ "
qualified_table_name text," +
" start_token
varchar," +
" end_token
varchar," +
" error_token
varchar," +
- " PRIMARY KEY
((job_id, bucket, table_name, start_token, end_token), error_token))" +
+ " PRIMARY KEY
((job_id, bucket, qualified_table_name, start_token, end_token), error_token))"
+
" WITH
default_time_to_live = %s";
public static final String SOURCE_CLUSTER_INDEX =
"source_cluster_index";
diff --git
a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
index 6434dc8..4214f2b 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
@@ -20,12 +20,13 @@
package org.apache.cassandra.diff;
import java.util.Iterator;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.Row;
public class PartitionComparator implements Callable<PartitionStats> {
diff --git
a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
index d31da4f..5ed225a 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
@@ -28,7 +28,10 @@ import java.util.stream.StreamSupport;
import com.google.common.annotations.VisibleForTesting;
-import com.datastax.driver.core.*;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Token;
import org.jetbrains.annotations.NotNull;
public class PartitionKey implements Comparable<PartitionKey> {
diff --git
a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
index 36ce2b5..bb5e937 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
@@ -20,11 +20,13 @@
package org.apache.cassandra.diff;
import java.math.BigInteger;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.Iterator;
+import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.*;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
import com.google.common.base.Verify;
import org.slf4j.Logger;
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
b/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
index a0f8043..3aad1eb 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
@@ -19,7 +19,10 @@
package org.apache.cassandra.diff;
-import java.io.*;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
import java.util.Objects;
import java.util.concurrent.atomic.LongAdder;
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
b/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
index d2f0963..38c131d 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
@@ -19,25 +19,30 @@
package org.apache.cassandra.diff;
-import com.datastax.driver.core.*;
-import java.util.*;
+import java.util.List;
+import java.util.Objects;
import java.util.stream.Collectors;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnMetadata;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.TableMetadata;
+
import static org.apache.cassandra.diff.DiffContext.cqlizedString;
public class TableSpec {
- private final String table;
+ private final KeyspaceTablePair keyspaceTablePair;
private ImmutableList<ColumnMetadata> clusteringColumns;
private ImmutableList<ColumnMetadata> regularColumns;
- public String getTable()
+ public KeyspaceTablePair getTable()
{
- return table;
+ return keyspaceTablePair;
}
@@ -55,23 +60,23 @@ public class TableSpec {
* @param clusteringColumns the clustering columns, retrieved from cluster
using the client
* @param regularColumns the non-primary key columns, retrieved from
cluster using the client
*/
- TableSpec(final String table,
+ TableSpec(final KeyspaceTablePair table,
final List<ColumnMetadata> clusteringColumns,
final List<ColumnMetadata> regularColumns) {
- this.table = table;
+ this.keyspaceTablePair = table;
this.clusteringColumns = ImmutableList.copyOf(clusteringColumns);
this.regularColumns = ImmutableList.copyOf(regularColumns);
}
- public static TableSpec make(String table, DiffCluster diffCluster) {
+ public static TableSpec make(KeyspaceTablePair keyspaceTablePair,
DiffCluster diffCluster) {
final Cluster cluster = diffCluster.cluster;
- final String cqlizedKeyspace = cqlizedString(diffCluster.keyspace);
- final String cqlizedTable = cqlizedString(table);
+ final String cqlizedKeyspace =
cqlizedString(keyspaceTablePair.keyspace);
+ final String cqlizedTable = cqlizedString(keyspaceTablePair.table);
KeyspaceMetadata ksMetadata =
cluster.getMetadata().getKeyspace(cqlizedKeyspace);
if (ksMetadata == null) {
- throw new IllegalArgumentException(String.format("Keyspace %s not
found in %s cluster", diffCluster.keyspace, diffCluster.clusterId));
+ throw new IllegalArgumentException(String.format("Keyspace %s not
found in %s cluster", keyspaceTablePair.keyspace, diffCluster.clusterId));
}
TableMetadata tableMetadata = ksMetadata.getTable(cqlizedTable);
@@ -80,11 +85,11 @@ public class TableSpec {
.stream()
.filter(c ->
!(clusteringColumns.contains(c)))
.collect(Collectors.toList());
- return new TableSpec(tableMetadata.getName(), clusteringColumns,
regularColumns);
+ return new TableSpec(KeyspaceTablePair.from(tableMetadata),
clusteringColumns, regularColumns);
}
public boolean equalsNamesOnly(TableSpec other) {
- return this.table.equals(other.table)
+ return this.keyspaceTablePair.equals(other.keyspaceTablePair)
&&
columnNames(this.clusteringColumns).equals(columnNames(other.clusteringColumns))
&&
columnNames(this.regularColumns).equals(columnNames(other.regularColumns));
}
@@ -101,19 +106,19 @@ public class TableSpec {
return false;
TableSpec other = (TableSpec)o;
- return this.table.equals(other.table)
+ return this.keyspaceTablePair.equals(other.keyspaceTablePair)
&& this.clusteringColumns.equals(other.clusteringColumns)
&& this.regularColumns.equals(other.regularColumns);
}
public int hashCode() {
- return Objects.hash(table, clusteringColumns, regularColumns);
+ return Objects.hash(keyspaceTablePair, clusteringColumns,
regularColumns);
}
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("table", table)
+ .add("table", keyspaceTablePair)
.add("clusteringColumns", clusteringColumns)
.add("regularColumns", regularColumns)
.toString();
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
index d8d92c7..9082970 100644
--- a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
@@ -24,9 +24,6 @@ import java.util.List;
import org.junit.Test;
-import org.apache.cassandra.diff.DiffJob;
-import org.apache.cassandra.diff.TokenHelper;
-
import static org.junit.Assert.assertEquals;
public class DiffJobTest
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
b/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
index 73677d9..e588575 100644
--- a/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
@@ -24,14 +24,8 @@ import java.util.Map;
import java.util.function.Function;
import com.google.common.base.VerifyException;
-import org.junit.Test;
-
import com.google.common.collect.Lists;
-
-import org.apache.cassandra.diff.DiffJob;
-import org.apache.cassandra.diff.Differ;
-import org.apache.cassandra.diff.RangeStats;
-import org.apache.cassandra.diff.TokenHelper;
+import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -74,9 +68,9 @@ public class DifferTest {
// * t2 is started and has reported some progress, but has not
completed
// * t3 has not reported any progress
DiffJob.Split split = new DiffJob.Split(1, 1, BigInteger.ONE,
BigInteger.TEN);
- Iterable<String> tables = Lists.newArrayList("t1", "t2", "t3");
- Function<String, DiffJob.TaskStatus> journal = (table) -> {
- switch (table) {
+ Iterable<KeyspaceTablePair> tables = Lists.newArrayList(ksTbl("t1"),
ksTbl("t2"), ksTbl("t3"));
+ Function<KeyspaceTablePair, DiffJob.TaskStatus> journal =
(keyspaceTable) -> {
+ switch (keyspaceTable.table) {
case "t1":
return new DiffJob.TaskStatus(split.end,
RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6));
case "t2":
@@ -88,24 +82,27 @@ public class DifferTest {
}
};
- Map<String, DiffJob.TaskStatus> filtered = Differ.filterTables(tables,
split, journal, false);
+ Map<KeyspaceTablePair, DiffJob.TaskStatus> filtered =
Differ.filterTables(tables, split, journal, false);
assertEquals(2, filtered.keySet().size());
- assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5),
filtered.get("t2").stats);
- assertEquals(BigInteger.valueOf(5L), filtered.get("t2").lastToken);
- assertEquals(RangeStats.newStats(), filtered.get("t3").stats);
- assertNull(filtered.get("t3").lastToken);
+ assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5),
filtered.get(ksTbl("t2")).stats);
+ assertEquals(BigInteger.valueOf(5L),
filtered.get(ksTbl("t2")).lastToken);
+ assertEquals(RangeStats.newStats(), filtered.get(ksTbl("t3")).stats);
+ assertNull(filtered.get(ksTbl("t3")).lastToken);
// if re-running (part of) a job because of failures or problematic
partitions, we want to
// ignore the status of completed tasks and re-run them anyway as only
specified tokens will
// be processed - so t1 should be included now
filtered = Differ.filterTables(tables, split, journal, true);
assertEquals(3, filtered.keySet().size());
- assertEquals(RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6),
filtered.get("t1").stats);
- assertEquals(split.end, filtered.get("t1").lastToken);
- assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5),
filtered.get("t2").stats);
- assertEquals(BigInteger.valueOf(5L), filtered.get("t2").lastToken);
- assertEquals(RangeStats.newStats(), filtered.get("t3").stats);
- assertNull(filtered.get("t3").lastToken);
+ assertEquals(RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6),
filtered.get(ksTbl("t1")).stats);
+ assertEquals(split.end, filtered.get(ksTbl("t1")).lastToken);
+ assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5),
filtered.get(ksTbl("t2")).stats);
+ assertEquals(BigInteger.valueOf(5L),
filtered.get(ksTbl("t2")).lastToken);
+ assertEquals(RangeStats.newStats(), filtered.get(ksTbl("t3")).stats);
+ assertNull(filtered.get(ksTbl("t3")).lastToken);
}
+ private KeyspaceTablePair ksTbl(String table) {
+ return new KeyspaceTablePair("ks", table);
+ }
}
diff --git
a/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
b/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
index 79b3638..9cd1892 100644
---
a/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
+++
b/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
@@ -33,9 +33,6 @@ import com.google.common.reflect.TypeToken;
import org.junit.Test;
import com.datastax.driver.core.*;
-import org.apache.cassandra.diff.PartitionComparator;
-import org.apache.cassandra.diff.PartitionStats;
-import org.apache.cassandra.diff.TableSpec;
import static org.junit.Assert.assertEquals;
@@ -221,7 +218,7 @@ public class PartitionComparatorTest {
}
TableSpec spec(String table, List<String> clusteringColumns, List<String>
regularColumns) {
- return new TableSpec(table, columns(clusteringColumns),
columns(regularColumns));
+ return new TableSpec(new KeyspaceTablePair("ks", table),
columns(clusteringColumns), columns(regularColumns));
}
List<ColumnMetadata> columns(List<String> names) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]