This is an automated email from the ASF dual-hosted git repository.

iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b2db56  [BEAM-7162] Add ValueProvider to CassandraIO Write
     new c38666a  Merge pull request #8413: [BEAM-7162] Add ValueProvider to 
CassandraIO Write
0b2db56 is described below

commit 0b2db56c25cca268bbd530fd9f9d3a4fcbc2315c
Author: Mathieu Blanchard <[email protected]>
AuthorDate: Fri Apr 26 15:12:21 2019 +0200

    [BEAM-7162] Add ValueProvider to CassandraIO Write
---
 .../apache/beam/sdk/io/cassandra/CassandraIO.java  | 98 +++++++++++-----------
 1 file changed, 51 insertions(+), 47 deletions(-)

diff --git 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index ea26769..f9e4080 100644
--- 
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++ 
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -59,7 +59,6 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import 
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
-import 
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -256,13 +255,13 @@ public class CassandraIO {
       return builder().setUsername(username).build();
     }
 
-    /** Specify the password for authentication. */
+    /** Specify the password used for authentication. */
     public Read<T> withPassword(String password) {
       checkArgument(password != null, "password can not be null");
       return withPassword(ValueProvider.StaticValueProvider.of(password));
     }
 
-    /** Specify the clear password for authentication. */
+    /** Specify the password used for authentication. */
     public Read<T> withPassword(ValueProvider<String> password) {
       return builder().setPassword(password).build();
     }
@@ -740,28 +739,28 @@ public class CassandraIO {
   @AutoValue
   public abstract static class Write<T> extends PTransform<PCollection<T>, 
PDone> {
     @Nullable
-    abstract ImmutableList<String> hosts();
+    abstract ValueProvider<List<String>> hosts();
 
     @Nullable
-    abstract Integer port();
+    abstract ValueProvider<Integer> port();
 
     @Nullable
-    abstract String keyspace();
+    abstract ValueProvider<String> keyspace();
 
     @Nullable
     abstract Class<T> entity();
 
     @Nullable
-    abstract String username();
+    abstract ValueProvider<String> username();
 
     @Nullable
-    abstract String password();
+    abstract ValueProvider<String> password();
 
     @Nullable
-    abstract String localDc();
+    abstract ValueProvider<String> localDc();
 
     @Nullable
-    abstract String consistencyLevel();
+    abstract ValueProvider<String> consistencyLevel();
 
     abstract MutationType mutationType();
 
@@ -785,6 +784,11 @@ public class CassandraIO {
               + getMutationTypeName()
               + "().withHosts(hosts) called with empty "
               + "hosts list");
+      return withHosts(ValueProvider.StaticValueProvider.of(hosts));
+    }
+
+    /** Specify the hosts of the Apache Cassandra instances. */
+    public Write<T> withHosts(ValueProvider<List<String>> hosts) {
       return builder().setHosts(hosts).build();
     }
 
@@ -797,6 +801,11 @@ public class CassandraIO {
               + "().withPort(port) called with invalid port "
               + "number (%s)",
           port);
+      return withPort(ValueProvider.StaticValueProvider.of(port));
+    }
+
+    /** Specify the port number of the Apache Cassandra instances. */
+    public Write<T> withPort(ValueProvider<Integer> port) {
       return builder().setPort(port).build();
     }
 
@@ -808,6 +817,11 @@ public class CassandraIO {
               + getMutationTypeName()
               + "().withKeyspace(keyspace) called with "
               + "null keyspace");
+      return withKeyspace(ValueProvider.StaticValueProvider.of(keyspace));
+    }
+
+    /** Specify the Cassandra keyspace where to read data. */
+    public Write<T> withKeyspace(ValueProvider<String> keyspace) {
       return builder().setKeyspace(keyspace).build();
     }
 
@@ -833,6 +847,11 @@ public class CassandraIO {
               + getMutationTypeName()
               + "().withUsername(username) called with "
               + "null username");
+      return withUsername(ValueProvider.StaticValueProvider.of(username));
+    }
+
+    /** Specify the username for authentication. */
+    public Write<T> withUsername(ValueProvider<String> username) {
       return builder().setUsername(username).build();
     }
 
@@ -844,6 +863,11 @@ public class CassandraIO {
               + getMutationTypeName()
               + "().withPassword(password) called with "
               + "null password");
+      return withPassword(ValueProvider.StaticValueProvider.of(password));
+    }
+
+    /** Specify the password used for authentication. */
+    public Write<T> withPassword(ValueProvider<String> password) {
       return builder().setPassword(password).build();
     }
 
@@ -855,6 +879,11 @@ public class CassandraIO {
               + getMutationTypeName()
               + "().withLocalDc(localDc) called with null"
               + " localDc");
+      return withLocalDc(ValueProvider.StaticValueProvider.of(localDc));
+    }
+
+    /** Specify the local DC used for the load balancing. */
+    public Write<T> withLocalDc(ValueProvider<String> localDc) {
       return builder().setLocalDc(localDc).build();
     }
 
@@ -865,6 +894,10 @@ public class CassandraIO {
               + getMutationTypeName()
               + "().withConsistencyLevel"
               + "(consistencyLevel) called with null consistencyLevel");
+      return 
withConsistencyLevel(ValueProvider.StaticValueProvider.of(consistencyLevel));
+    }
+
+    public Write<T> withConsistencyLevel(ValueProvider<String> 
consistencyLevel) {
       return builder().setConsistencyLevel(consistencyLevel).build();
     }
 
@@ -923,23 +956,23 @@ public class CassandraIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
-      abstract Builder<T> setHosts(List<String> hosts);
+      abstract Builder<T> setHosts(ValueProvider<List<String>> hosts);
 
-      abstract Builder<T> setPort(Integer port);
+      abstract Builder<T> setPort(ValueProvider<Integer> port);
 
-      abstract Builder<T> setKeyspace(String keyspace);
+      abstract Builder<T> setKeyspace(ValueProvider<String> keyspace);
 
       abstract Builder<T> setEntity(Class<T> entity);
 
       abstract Optional<Class<T>> entity();
 
-      abstract Builder<T> setUsername(String username);
+      abstract Builder<T> setUsername(ValueProvider<String> username);
 
-      abstract Builder<T> setPassword(String password);
+      abstract Builder<T> setPassword(ValueProvider<String> password);
 
-      abstract Builder<T> setLocalDc(String localDc);
+      abstract Builder<T> setLocalDc(ValueProvider<String> localDc);
 
-      abstract Builder<T> setConsistencyLevel(String consistencyLevel);
+      abstract Builder<T> setConsistencyLevel(ValueProvider<String> 
consistencyLevel);
 
       abstract Builder<T> setMutationType(MutationType mutationType);
 
@@ -1011,35 +1044,6 @@ public class CassandraIO {
 
   /** Get a Cassandra cluster using hosts and port. */
   private static Cluster getCluster(
-      List<String> hosts,
-      int port,
-      String username,
-      String password,
-      String localDc,
-      String consistencyLevel) {
-    Cluster.Builder builder =
-        Cluster.builder().addContactPoints(hosts.toArray(new 
String[0])).withPort(port);
-
-    if (username != null) {
-      builder.withAuthProvider(new PlainTextAuthProvider(username, password));
-    }
-
-    DCAwareRoundRobinPolicy.Builder dcAwarePolicyBuilder = new 
DCAwareRoundRobinPolicy.Builder();
-    if (localDc != null) {
-      dcAwarePolicyBuilder.withLocalDc(localDc);
-    }
-
-    builder.withLoadBalancingPolicy(new 
TokenAwarePolicy(dcAwarePolicyBuilder.build()));
-
-    if (consistencyLevel != null) {
-      builder.withQueryOptions(
-          new 
QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel)));
-    }
-
-    return builder.build();
-  }
-
-  private static Cluster getCluster(
       ValueProvider<List<String>> hosts,
       ValueProvider<Integer> port,
       ValueProvider<String> username,
@@ -1092,7 +1096,7 @@ public class CassandraIO {
               spec.password(),
               spec.localDc(),
               spec.consistencyLevel());
-      this.session = cluster.connect(spec.keyspace());
+      this.session = cluster.connect(spec.keyspace().get());
       this.mapperFactoryFn = spec.mapperFactoryFn();
       this.mutateFutures = new ArrayList<>();
       this.mutator = mutator;

Reply via email to