This is an automated email from the ASF dual-hosted git repository.
iemejia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 0b2db56 [BEAM-7162] Add ValueProvider to CassandraIO Write
new c38666a Merge pull request #8413: [BEAM-7162] Add ValueProvider to
CassandraIO Write
0b2db56 is described below
commit 0b2db56c25cca268bbd530fd9f9d3a4fcbc2315c
Author: Mathieu Blanchard <[email protected]>
AuthorDate: Fri Apr 26 15:12:21 2019 +0200
[BEAM-7162] Add ValueProvider to CassandraIO Write
---
.../apache/beam/sdk/io/cassandra/CassandraIO.java | 98 +++++++++++-----------
1 file changed, 51 insertions(+), 47 deletions(-)
diff --git
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
index ea26769..f9e4080 100644
---
a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
+++
b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
@@ -59,7 +59,6 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import
org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Joiner;
-import
org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterators;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -256,13 +255,13 @@ public class CassandraIO {
return builder().setUsername(username).build();
}
- /** Specify the password for authentication. */
+ /** Specify the password used for authentication. */
public Read<T> withPassword(String password) {
checkArgument(password != null, "password can not be null");
return withPassword(ValueProvider.StaticValueProvider.of(password));
}
- /** Specify the clear password for authentication. */
+ /** Specify the password used for authentication. */
public Read<T> withPassword(ValueProvider<String> password) {
return builder().setPassword(password).build();
}
@@ -740,28 +739,28 @@ public class CassandraIO {
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>,
PDone> {
@Nullable
- abstract ImmutableList<String> hosts();
+ abstract ValueProvider<List<String>> hosts();
@Nullable
- abstract Integer port();
+ abstract ValueProvider<Integer> port();
@Nullable
- abstract String keyspace();
+ abstract ValueProvider<String> keyspace();
@Nullable
abstract Class<T> entity();
@Nullable
- abstract String username();
+ abstract ValueProvider<String> username();
@Nullable
- abstract String password();
+ abstract ValueProvider<String> password();
@Nullable
- abstract String localDc();
+ abstract ValueProvider<String> localDc();
@Nullable
- abstract String consistencyLevel();
+ abstract ValueProvider<String> consistencyLevel();
abstract MutationType mutationType();
@@ -785,6 +784,11 @@ public class CassandraIO {
+ getMutationTypeName()
+ "().withHosts(hosts) called with empty "
+ "hosts list");
+ return withHosts(ValueProvider.StaticValueProvider.of(hosts));
+ }
+
+ /** Specify the hosts of the Apache Cassandra instances. */
+ public Write<T> withHosts(ValueProvider<List<String>> hosts) {
return builder().setHosts(hosts).build();
}
@@ -797,6 +801,11 @@ public class CassandraIO {
+ "().withPort(port) called with invalid port "
+ "number (%s)",
port);
+ return withPort(ValueProvider.StaticValueProvider.of(port));
+ }
+
+ /** Specify the port number of the Apache Cassandra instances. */
+ public Write<T> withPort(ValueProvider<Integer> port) {
return builder().setPort(port).build();
}
@@ -808,6 +817,11 @@ public class CassandraIO {
+ getMutationTypeName()
+ "().withKeyspace(keyspace) called with "
+ "null keyspace");
+ return withKeyspace(ValueProvider.StaticValueProvider.of(keyspace));
+ }
+
+ /** Specify the Cassandra keyspace where to read data. */
+ public Write<T> withKeyspace(ValueProvider<String> keyspace) {
return builder().setKeyspace(keyspace).build();
}
@@ -833,6 +847,11 @@ public class CassandraIO {
+ getMutationTypeName()
+ "().withUsername(username) called with "
+ "null username");
+ return withUsername(ValueProvider.StaticValueProvider.of(username));
+ }
+
+ /** Specify the username for authentication. */
+ public Write<T> withUsername(ValueProvider<String> username) {
return builder().setUsername(username).build();
}
@@ -844,6 +863,11 @@ public class CassandraIO {
+ getMutationTypeName()
+ "().withPassword(password) called with "
+ "null password");
+ return withPassword(ValueProvider.StaticValueProvider.of(password));
+ }
+
+ /** Specify the password used for authentication. */
+ public Write<T> withPassword(ValueProvider<String> password) {
return builder().setPassword(password).build();
}
@@ -855,6 +879,11 @@ public class CassandraIO {
+ getMutationTypeName()
+ "().withLocalDc(localDc) called with null"
+ " localDc");
+ return withLocalDc(ValueProvider.StaticValueProvider.of(localDc));
+ }
+
+ /** Specify the local DC used for the load balancing. */
+ public Write<T> withLocalDc(ValueProvider<String> localDc) {
return builder().setLocalDc(localDc).build();
}
@@ -865,6 +894,10 @@ public class CassandraIO {
+ getMutationTypeName()
+ "().withConsistencyLevel"
+ "(consistencyLevel) called with null consistencyLevel");
+ return
withConsistencyLevel(ValueProvider.StaticValueProvider.of(consistencyLevel));
+ }
+
+ public Write<T> withConsistencyLevel(ValueProvider<String>
consistencyLevel) {
return builder().setConsistencyLevel(consistencyLevel).build();
}
@@ -923,23 +956,23 @@ public class CassandraIO {
@AutoValue.Builder
abstract static class Builder<T> {
- abstract Builder<T> setHosts(List<String> hosts);
+ abstract Builder<T> setHosts(ValueProvider<List<String>> hosts);
- abstract Builder<T> setPort(Integer port);
+ abstract Builder<T> setPort(ValueProvider<Integer> port);
- abstract Builder<T> setKeyspace(String keyspace);
+ abstract Builder<T> setKeyspace(ValueProvider<String> keyspace);
abstract Builder<T> setEntity(Class<T> entity);
abstract Optional<Class<T>> entity();
- abstract Builder<T> setUsername(String username);
+ abstract Builder<T> setUsername(ValueProvider<String> username);
- abstract Builder<T> setPassword(String password);
+ abstract Builder<T> setPassword(ValueProvider<String> password);
- abstract Builder<T> setLocalDc(String localDc);
+ abstract Builder<T> setLocalDc(ValueProvider<String> localDc);
- abstract Builder<T> setConsistencyLevel(String consistencyLevel);
+ abstract Builder<T> setConsistencyLevel(ValueProvider<String>
consistencyLevel);
abstract Builder<T> setMutationType(MutationType mutationType);
@@ -1011,35 +1044,6 @@ public class CassandraIO {
/** Get a Cassandra cluster using hosts and port. */
private static Cluster getCluster(
- List<String> hosts,
- int port,
- String username,
- String password,
- String localDc,
- String consistencyLevel) {
- Cluster.Builder builder =
- Cluster.builder().addContactPoints(hosts.toArray(new
String[0])).withPort(port);
-
- if (username != null) {
- builder.withAuthProvider(new PlainTextAuthProvider(username, password));
- }
-
- DCAwareRoundRobinPolicy.Builder dcAwarePolicyBuilder = new
DCAwareRoundRobinPolicy.Builder();
- if (localDc != null) {
- dcAwarePolicyBuilder.withLocalDc(localDc);
- }
-
- builder.withLoadBalancingPolicy(new
TokenAwarePolicy(dcAwarePolicyBuilder.build()));
-
- if (consistencyLevel != null) {
- builder.withQueryOptions(
- new
QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel)));
- }
-
- return builder.build();
- }
-
- private static Cluster getCluster(
ValueProvider<List<String>> hosts,
ValueProvider<Integer> port,
ValueProvider<String> username,
@@ -1092,7 +1096,7 @@ public class CassandraIO {
spec.password(),
spec.localDc(),
spec.consistencyLevel());
- this.session = cluster.connect(spec.keyspace());
+ this.session = cluster.connect(spec.keyspace().get());
this.mapperFactoryFn = spec.mapperFactoryFn();
this.mutateFutures = new ArrayList<>();
this.mutator = mutator;