This is an automated email from the ASF dual-hosted git repository.
echauchot pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 26b7018 [BEAM-6324] Add filter to query for CassandraIO.Read
new b3cd1ef Merge pull request #7340 from
srfrnk/cassandra-reader-with-query
26b7018 is described below
commit 26b701831b0368322c80d519e384a695c46999a7
Author: srfrnk <[email protected]>
AuthorDate: Wed Feb 20 06:38:01 2019 +0200
[BEAM-6324] Add filter to query for CassandraIO.Read
---
.../apache/beam/sdk/io/cassandra/CassandraIO.java | 101 ++++++++++++++++-----
.../beam/sdk/io/cassandra/CassandraIOIT.java | 39 +++++---
.../beam/sdk/io/cassandra/CassandraIOTest.java | 20 ++++
3 files changed, 128 insertions(+), 32 deletions(-)
diff --git
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 6ef0ef7..06e6d1a 100644
---
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -31,11 +31,10 @@ import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
import com.datastax.driver.core.policies.TokenAwarePolicy;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.datastax.driver.core.querybuilder.Select;
import com.datastax.driver.mapping.Mapper;
import com.datastax.driver.mapping.MappingManager;
import com.google.auto.value.AutoValue;
+import com.google.common.base.Joiner;
import com.google.common.util.concurrent.ListenableFuture;
import java.math.BigInteger;
import java.util.ArrayList;
@@ -168,6 +167,9 @@ public class CassandraIO {
abstract String consistencyLevel();
@Nullable
+ abstract String where();
+
+ @Nullable
abstract Integer minNumberOfSplits();
abstract Builder<T> builder();
@@ -256,6 +258,24 @@ public class CassandraIO {
}
/**
+ * Specify a string with a partial {@code Where} clause. Note: Cassandra
places restrictions on
+ * the {@code Where} clause you may use. (e.g. filter on a
primary/clustering column only etc.)
+ *
+ * @param where Partial {@code Where} clause. Optional - If unspecified
will not filter the
+ * data.
+ * @see <a
href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
+ * Documentation</a>
+ * @throws com.datastax.driver.core.exceptions.InvalidQueryException If
{@code Where} clause
+ * makes the generated query invalid. Please Consult <a
+ *
href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause">CQL
+ * Documentation</a> for more info on correct usage of the {@code
Where} clause.
+ */
+ public Read<T> withWhere(String where) {
+ checkArgument(where != null, "where can not be null");
+ return builder().setWhere(where).build();
+ }
+
+ /**
* It's possible that system.size_estimates isn't populated or that the
number of splits
* computed by Beam is still to low for Cassandra to handle it. This
setting allows to enforce a
* minimum number of splits in case Beam cannot compute it correctly.
@@ -303,6 +323,8 @@ public class CassandraIO {
abstract Builder<T> setConsistencyLevel(String consistencyLevel);
+ abstract Builder<T> setWhere(String where);
+
abstract Builder<T> setMinNumberOfSplits(Integer minNumberOfSplits);
abstract Read<T> build();
@@ -351,7 +373,12 @@ public class CassandraIO {
LOG.warn(
"Only Murmur3Partitioner is supported for splitting, using an
unique source for "
+ "the read");
- String splitQuery = QueryBuilder.select().from(spec.keyspace(),
spec.table()).toString();
+ String splitQuery =
+ String.format(
+ "SELECT * FROM %s.%s%s;",
+ spec.keyspace(),
+ spec.table(),
+ spec.where() == null ? "" : String.format(" WHERE %s",
spec.where()));
return Collections.singletonList(
new CassandraIO.CassandraSource<>(spec,
Collections.singletonList(splitQuery)));
}
@@ -389,7 +416,6 @@ public class CassandraIO {
for (List<RingRange> split : splits) {
List<String> queries = new ArrayList<>();
for (RingRange range : split) {
- Select.Where builder = QueryBuilder.select().from(spec.keyspace(),
spec.table()).where();
if (range.isWrapping()) {
// A wrapping range is one that overlaps from the end of the
partitioner range and its
// start (ie : when the start token of the split is greater than
the end token)
@@ -397,25 +423,32 @@ public class CassandraIO {
// of
// the partitioner range, and the other from the start of the
partitioner range to the
// end token of the split.
- builder =
- builder.and(QueryBuilder.gte("token(" + partitionKey + ")",
range.getStart()));
- String query = builder.toString();
- LOG.debug("Cassandra generated read query : {}", query);
- queries.add(query);
-
+ queries.add(
+ generateRangeQuery(
+ spec.keyspace(),
+ spec.table(),
+ spec.where(),
+ partitionKey,
+ range.getStart(),
+ null));
// Generation of the second query of the wrapping range
- builder = QueryBuilder.select().from(spec.keyspace(),
spec.table()).where();
- builder = builder.and(QueryBuilder.lt("token(" + partitionKey +
")", range.getEnd()));
- query = builder.toString();
- LOG.debug("Cassandra generated read query : {}", query);
- queries.add(query);
+ queries.add(
+ generateRangeQuery(
+ spec.keyspace(),
+ spec.table(),
+ spec.where(),
+ partitionKey,
+ null,
+ range.getEnd()));
} else {
- builder =
- builder.and(QueryBuilder.gte("token(" + partitionKey + ")",
range.getStart()));
- builder = builder.and(QueryBuilder.lt("token(" + partitionKey +
")", range.getEnd()));
- String query = builder.toString();
- LOG.debug("Cassandra generated read query : {}", query);
- queries.add(query);
+ queries.add(
+ generateRangeQuery(
+ spec.keyspace(),
+ spec.table(),
+ spec.where(),
+ partitionKey,
+ range.getStart(),
+ range.getEnd()));
}
}
sources.add(new CassandraIO.CassandraSource<>(spec, queries));
@@ -423,6 +456,32 @@ public class CassandraIO {
return sources;
}
+ private static String generateRangeQuery(
+ String keyspace,
+ String table,
+ String where,
+ String partitionKey,
+ BigInteger rangeStart,
+ BigInteger rangeEnd) {
+ String query =
+ String.format(
+ "SELECT * FROM %s.%s WHERE %s;",
+ keyspace,
+ table,
+ Joiner.on(" AND ")
+ .skipNulls()
+ .join(
+ where == null ? null : String.format("(%s)", where),
+ rangeStart == null
+ ? null
+ : String.format("(token(%s)>=%d)", partitionKey,
rangeStart),
+ rangeEnd == null
+ ? null
+ : String.format("(token(%s)<%d)", partitionKey,
rangeEnd)));
+ LOG.debug("Cassandra generated read query : {}", query);
+ return query;
+ }
+
private static long getNumSplits(
long desiredBundleSizeBytes, long estimatedSizeBytes, @Nullable
Integer minNumberOfSplits) {
long numSplits =
diff --git
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
index 7e5ce37..28abf3f 100644
---
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
+++
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
@@ -51,19 +52,13 @@ import org.slf4j.LoggerFactory;
/**
* A test of {@link CassandraIO} on a concrete and independent Cassandra
instance.
*
- * <p>This test requires a running Cassandra instance, and the test dataset
must exists.
+ * <p>This test requires a running Cassandra instance at [localhost:9042], and
the test dataset must exists.
*
- * <p>You can run this test directly using Maven with:
+ * <p>You can run this test directly using gradle with:
*
* <pre>{@code
- * ./gradlew integrationTest -p sdks/java/io/cassandra
-DintegrationTestPipelineOptions='[
- * "--cassandraHost=1.2.3.4",
- * "--cassandraPort=9042"
- * "--numberOfRecords=1000"
- * ]'
- * --tests org.apache.beam.sdk.io.cassandra.CassandraIOIT
- * -DintegrationTestRunner=direct
- * }</pre>
+ * ./gradlew integrationTest -p sdks/java/io/cassandra
-DintegrationTestPipelineOptions='["--cassandraHost=127.0.0.1","--cassandraPort=9042","--numberOfRecords=1000"]'
--tests org.apache.beam.sdk.io.cassandra.CassandraIOIT
-DintegrationTestRunner=direct
+ * </pre>
*/
@RunWith(JUnit4.class)
public class CassandraIOIT implements Serializable {
@@ -105,13 +100,18 @@ public class CassandraIOIT implements Serializable {
dropTable(options, KEYSPACE, TABLE);
}
- /** Tests writing then reading data for a HBase database. */
@Test
public void testWriteThenRead() {
runWrite();
runRead();
}
+ @Test
+ public void testWriteThenReadWithWhere() {
+ runWrite();
+ runReadWithWhere();
+ }
+
private void runWrite() {
pipelineWrite
.apply("GenSequence", GenerateSequence.from(0).to((long)
options.getNumberOfRecords()))
@@ -151,6 +151,23 @@ public class CassandraIOIT implements Serializable {
pipelineRead.run().waitUntilFinish();
}
+ private void runReadWithWhere() {
+ PCollection<Scientist> output =
+ pipelineRead.apply(
+ CassandraIO.<Scientist>read()
+ .withHosts(options.getCassandraHost())
+ .withPort(options.getCassandraPort())
+ .withKeyspace(KEYSPACE)
+ .withTable(TABLE)
+ .withEntity(Scientist.class)
+ .withCoder(SerializableCoder.of(Scientist.class))
+ .withWhere("id=100"));
+
+ PAssert.thatSingleton(output.apply("Count",
Count.globally())).isEqualTo(1L);
+
+ pipelineRead.run().waitUntilFinish();
+ }
+
private static Cluster getCluster(CassandraIOITOptions options) {
return Cluster.builder()
.addContactPoints(options.getCassandraHost().toArray(new String[0]))
diff --git
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index ab471d4..92d1068 100644
---
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -245,6 +245,26 @@ public class CassandraIOTest implements Serializable {
}
@Test
+ public void testReadWithWhere() throws Exception {
+ insertRecords();
+
+ PCollection<Scientist> output =
+ pipeline.apply(
+ CassandraIO.<Scientist>read()
+ .withHosts(Arrays.asList(CASSANDRA_HOST))
+ .withPort(CASSANDRA_PORT)
+ .withKeyspace(CASSANDRA_KEYSPACE)
+ .withTable(CASSANDRA_TABLE)
+ .withCoder(SerializableCoder.of(Scientist.class))
+ .withEntity(Scientist.class)
+ .withWhere("person_id=10"));
+
+ PAssert.thatSingleton(output.apply("Count",
Count.globally())).isEqualTo(1L);
+
+ pipeline.run();
+ }
+
+ @Test
public void testWrite() {
ArrayList<Scientist> scientists = buildScientists(NUM_ROWS);