This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 26b7018  [BEAM-6324] Add filter to query for CassandraIO.Read
     new b3cd1ef  Merge pull request #7340 from 
srfrnk/cassandra-reader-with-query
26b7018 is described below

commit 26b701831b0368322c80d519e384a695c46999a7
Author: srfrnk <[email protected]>
AuthorDate: Wed Feb 20 06:38:01 2019 +0200

    [BEAM-6324] Add filter to query for CassandraIO.Read
---
 .../apache/beam/sdk/io/cassandra/CassandraIO.java  | 101 ++++++++++++++++-----
 .../beam/sdk/io/cassandra/CassandraIOIT.java       |  39 +++++---
 .../beam/sdk/io/cassandra/CassandraIOTest.java     |  20 ++++
 3 files changed, 128 insertions(+), 32 deletions(-)

diff --git 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index 6ef0ef7..06e6d1a 100644
--- 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -31,11 +31,10 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
 import com.datastax.driver.core.policies.TokenAwarePolicy;
-import com.datastax.driver.core.querybuilder.QueryBuilder;
-import com.datastax.driver.core.querybuilder.Select;
 import com.datastax.driver.mapping.Mapper;
 import com.datastax.driver.mapping.MappingManager;
 import com.google.auto.value.AutoValue;
+import com.google.common.base.Joiner;
 import com.google.common.util.concurrent.ListenableFuture;
 import java.math.BigInteger;
 import java.util.ArrayList;
@@ -168,6 +167,9 @@ public class CassandraIO {
     abstract String consistencyLevel();
 
     @Nullable
+    abstract String where();
+
+    @Nullable
     abstract Integer minNumberOfSplits();
 
     abstract Builder<T> builder();
@@ -256,6 +258,24 @@ public class CassandraIO {
     }
 
     /**
+     * Specify a string with a partial {@code Where} clause. Note: Cassandra 
places restrictions on
+     * the {@code Where} clause you may use. (e.g. filter on a 
primary/clustering column only etc.)
+     *
+     * @param where Partial {@code Where} clause. Optional - If unspecified 
will not filter the
+     *     data.
+     * @see <a 
href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause";>CQL
+     *     Documentation</a>
+     * @throws com.datastax.driver.core.exceptions.InvalidQueryException If 
{@code Where} clause
+     *     makes the generated query invalid. Please Consult <a
+     *     
href="http://cassandra.apache.org/doc/4.0/cql/dml.html#where-clause";>CQL
+     *     Documentation</a> for more info on correct usage of the {@code 
Where} clause.
+     */
+    public Read<T> withWhere(String where) {
+      checkArgument(where != null, "where can not be null");
+      return builder().setWhere(where).build();
+    }
+
+    /**
      * It's possible that system.size_estimates isn't populated or that the 
number of splits
      * computed by Beam is still to low for Cassandra to handle it. This 
setting allows to enforce a
      * minimum number of splits in case Beam cannot compute it correctly.
@@ -303,6 +323,8 @@ public class CassandraIO {
 
       abstract Builder<T> setConsistencyLevel(String consistencyLevel);
 
+      abstract Builder<T> setWhere(String where);
+
       abstract Builder<T> setMinNumberOfSplits(Integer minNumberOfSplits);
 
       abstract Read<T> build();
@@ -351,7 +373,12 @@ public class CassandraIO {
           LOG.warn(
               "Only Murmur3Partitioner is supported for splitting, using an 
unique source for "
                   + "the read");
-          String splitQuery = QueryBuilder.select().from(spec.keyspace(), 
spec.table()).toString();
+          String splitQuery =
+              String.format(
+                  "SELECT * FROM %s.%s%s;",
+                  spec.keyspace(),
+                  spec.table(),
+                  spec.where() == null ? "" : String.format(" WHERE %s", 
spec.where()));
           return Collections.singletonList(
               new CassandraIO.CassandraSource<>(spec, 
Collections.singletonList(splitQuery)));
         }
@@ -389,7 +416,6 @@ public class CassandraIO {
       for (List<RingRange> split : splits) {
         List<String> queries = new ArrayList<>();
         for (RingRange range : split) {
-          Select.Where builder = QueryBuilder.select().from(spec.keyspace(), 
spec.table()).where();
           if (range.isWrapping()) {
             // A wrapping range is one that overlaps from the end of the 
partitioner range and its
             // start (ie : when the start token of the split is greater than 
the end token)
@@ -397,25 +423,32 @@ public class CassandraIO {
             // of
             // the partitioner range, and the other from the start of the 
partitioner range to the
             // end token of the split.
-            builder =
-                builder.and(QueryBuilder.gte("token(" + partitionKey + ")", 
range.getStart()));
-            String query = builder.toString();
-            LOG.debug("Cassandra generated read query : {}", query);
-            queries.add(query);
-
+            queries.add(
+                generateRangeQuery(
+                    spec.keyspace(),
+                    spec.table(),
+                    spec.where(),
+                    partitionKey,
+                    range.getStart(),
+                    null));
             // Generation of the second query of the wrapping range
-            builder = QueryBuilder.select().from(spec.keyspace(), 
spec.table()).where();
-            builder = builder.and(QueryBuilder.lt("token(" + partitionKey + 
")", range.getEnd()));
-            query = builder.toString();
-            LOG.debug("Cassandra generated read query : {}", query);
-            queries.add(query);
+            queries.add(
+                generateRangeQuery(
+                    spec.keyspace(),
+                    spec.table(),
+                    spec.where(),
+                    partitionKey,
+                    null,
+                    range.getEnd()));
           } else {
-            builder =
-                builder.and(QueryBuilder.gte("token(" + partitionKey + ")", 
range.getStart()));
-            builder = builder.and(QueryBuilder.lt("token(" + partitionKey + 
")", range.getEnd()));
-            String query = builder.toString();
-            LOG.debug("Cassandra generated read query : {}", query);
-            queries.add(query);
+            queries.add(
+                generateRangeQuery(
+                    spec.keyspace(),
+                    spec.table(),
+                    spec.where(),
+                    partitionKey,
+                    range.getStart(),
+                    range.getEnd()));
           }
         }
         sources.add(new CassandraIO.CassandraSource<>(spec, queries));
@@ -423,6 +456,32 @@ public class CassandraIO {
       return sources;
     }
 
+    private static String generateRangeQuery(
+        String keyspace,
+        String table,
+        String where,
+        String partitionKey,
+        BigInteger rangeStart,
+        BigInteger rangeEnd) {
+      String query =
+          String.format(
+              "SELECT * FROM %s.%s WHERE %s;",
+              keyspace,
+              table,
+              Joiner.on(" AND ")
+                  .skipNulls()
+                  .join(
+                      where == null ? null : String.format("(%s)", where),
+                      rangeStart == null
+                          ? null
+                          : String.format("(token(%s)>=%d)", partitionKey, 
rangeStart),
+                      rangeEnd == null
+                          ? null
+                          : String.format("(token(%s)<%d)", partitionKey, 
rangeEnd)));
+      LOG.debug("Cassandra generated read query : {}", query);
+      return query;
+    }
+
     private static long getNumSplits(
         long desiredBundleSizeBytes, long estimatedSizeBytes, @Nullable 
Integer minNumberOfSplits) {
       long numSplits =
diff --git 
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
 
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
index 7e5ce37..28abf3f 100644
--- 
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
+++ 
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOIT.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
@@ -51,19 +52,13 @@ import org.slf4j.LoggerFactory;
 /**
  * A test of {@link CassandraIO} on a concrete and independent Cassandra 
instance.
  *
- * <p>This test requires a running Cassandra instance, and the test dataset 
must exists.
+ * <p>This test requires a running Cassandra instance at [localhost:9042], and 
the test dataset must exists.
  *
- * <p>You can run this test directly using Maven with:
+ * <p>You can run this test directly using gradle with:
  *
  * <pre>{@code
- * ./gradlew integrationTest -p sdks/java/io/cassandra 
-DintegrationTestPipelineOptions='[
- * "--cassandraHost=1.2.3.4",
- * "--cassandraPort=9042"
- * "--numberOfRecords=1000"
- * ]'
- * --tests org.apache.beam.sdk.io.cassandra.CassandraIOIT
- * -DintegrationTestRunner=direct
- * }</pre>
+ * ./gradlew integrationTest -p sdks/java/io/cassandra 
-DintegrationTestPipelineOptions='["--cassandraHost=127.0.0.1","--cassandraPort=9042","--numberOfRecords=1000"]'
 --tests org.apache.beam.sdk.io.cassandra.CassandraIOIT 
-DintegrationTestRunner=direct
+ * </pre>
  */
 @RunWith(JUnit4.class)
 public class CassandraIOIT implements Serializable {
@@ -105,13 +100,18 @@ public class CassandraIOIT implements Serializable {
     dropTable(options, KEYSPACE, TABLE);
   }
 
-  /** Tests writing then reading data for a HBase database. */
   @Test
   public void testWriteThenRead() {
     runWrite();
     runRead();
   }
 
+  @Test
+  public void testWriteThenReadWithWhere() {
+    runWrite();
+    runReadWithWhere();
+  }
+
   private void runWrite() {
     pipelineWrite
         .apply("GenSequence", GenerateSequence.from(0).to((long) 
options.getNumberOfRecords()))
@@ -151,6 +151,23 @@ public class CassandraIOIT implements Serializable {
     pipelineRead.run().waitUntilFinish();
   }
 
+  private void runReadWithWhere() {
+    PCollection<Scientist> output =
+        pipelineRead.apply(
+            CassandraIO.<Scientist>read()
+                .withHosts(options.getCassandraHost())
+                .withPort(options.getCassandraPort())
+                .withKeyspace(KEYSPACE)
+                .withTable(TABLE)
+                .withEntity(Scientist.class)
+                .withCoder(SerializableCoder.of(Scientist.class))
+                .withWhere("id=100"));
+
+    PAssert.thatSingleton(output.apply("Count", 
Count.globally())).isEqualTo(1L);
+
+    pipelineRead.run().waitUntilFinish();
+  }
+
   private static Cluster getCluster(CassandraIOITOptions options) {
     return Cluster.builder()
         .addContactPoints(options.getCassandraHost().toArray(new String[0]))
diff --git 
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
 
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
index ab471d4..92d1068 100644
--- 
a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
+++ 
b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java
@@ -245,6 +245,26 @@ public class CassandraIOTest implements Serializable {
   }
 
   @Test
+  public void testReadWithWhere() throws Exception {
+    insertRecords();
+
+    PCollection<Scientist> output =
+        pipeline.apply(
+            CassandraIO.<Scientist>read()
+                .withHosts(Arrays.asList(CASSANDRA_HOST))
+                .withPort(CASSANDRA_PORT)
+                .withKeyspace(CASSANDRA_KEYSPACE)
+                .withTable(CASSANDRA_TABLE)
+                .withCoder(SerializableCoder.of(Scientist.class))
+                .withEntity(Scientist.class)
+                .withWhere("person_id=10"));
+
+    PAssert.thatSingleton(output.apply("Count", 
Count.globally())).isEqualTo(1L);
+
+    pipeline.run();
+  }
+
+  @Test
   public void testWrite() {
     ArrayList<Scientist> scientists = buildScientists(NUM_ROWS);
 

Reply via email to