echauchot commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r427861203
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -106,19 +107,13 @@
*
* <h3>Cassandra Socket Options</h3>
*
- * <p>The following example illustrates various options for tuning client
socket:
+ * <p>The following example illustrates setting timeouts for the Cassandra
client:
Review comment:
Can you also change a leftover in the javadoc: An IO to read **and
write** **from/to** Apache Cassandra ?
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
return builder().setMinNumberOfSplits(minNumberOfSplits).build();
}
+ /** Cassandra client socket option to set the connect timeout. */
+ public Read<T> withConnectTimeout(Integer timeout) {
+ return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+ }
+
+ public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {
Review comment:
- you need a javadoc for both methods as they are both public (2
versions for backward compatibility as valueprovider was introduced lately)
- specify that they are millis
- add links to cassandra socketoptions setConnectTimeOut
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
return builder().setMinNumberOfSplits(minNumberOfSplits).build();
}
+ /** Cassandra client socket option to set the connect timeout. */
+ public Read<T> withConnectTimeout(Integer timeout) {
+ return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
Review comment:
Add input values check (!= null && > 0) as in the other methods with the
checkArgument call. As the value provider version method relies on this
version, put the checkArgument here. Do not forget to put the validation in the
other parameters methods
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
return builder().setMinNumberOfSplits(minNumberOfSplits).build();
}
+ /** Cassandra client socket option to set the connect timeout. */
+ public Read<T> withConnectTimeout(Integer timeout) {
+ return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+ }
+
+ public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {
+ return builder().setConnectTimeout(timeout).build();
+ }
+
+ /** Cassandra client socket option to set the read timeout. */
+ public Read<T> withReadTimeout(Integer timeout) {
+ return withReadTimeout(ValueProvider.StaticValueProvider.of(timeout));
+ }
+
+ public Read<T> withReadTimeout(ValueProvider<Integer> timeout) {
Review comment:
same as above
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -948,6 +980,15 @@ public T getCurrent() throws NoSuchElementException {
return builder().setConsistencyLevel(consistencyLevel).build();
}
+ /** Cassandra client socket option for connect timeout. */
+ public Write<T> withConnectTimeout(Integer timeout) {
+ return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+ }
+
+ public Write<T> withConnectTimeout(ValueProvider<Integer> timeout) {
+ return builder().setConnectTimeout(timeout).build();
Review comment:
add read timeout
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1023,6 +1065,8 @@ private String getMutationTypeName() {
abstract Builder<T> setMutationType(MutationType mutationType);
+ abstract Builder<T> setConnectTimeout(ValueProvider<Integer> timeout);
Review comment:
add read timeout
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -948,6 +980,15 @@ public T getCurrent() throws NoSuchElementException {
return builder().setConsistencyLevel(consistencyLevel).build();
}
+ /** Cassandra client socket option for connect timeout. */
+ public Write<T> withConnectTimeout(Integer timeout) {
+ return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+ }
+
+ public Write<T> withConnectTimeout(ValueProvider<Integer> timeout) {
Review comment:
Add withReadTimeout cf global review comment
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -811,6 +840,9 @@ public T getCurrent() throws NoSuchElementException {
abstract MutationType mutationType();
+ @Nullable
Review comment:
Add readTimeout cf glocal review comment
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1116,6 +1163,18 @@ private static Cluster getCluster(
new
QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
}
+ SocketOptions socketOptions = new SocketOptions();
+
+ builder.withSocketOptions(socketOptions);
+
+ if (connectTimeout != null) {
Review comment:
no more null check needed if both timeouts are set as part of Read and
Write and if you add the validation of inputs in the with* methods
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1116,6 +1163,18 @@ private static Cluster getCluster(
new
QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
}
+ SocketOptions socketOptions = new SocketOptions();
+
+ builder.withSocketOptions(socketOptions);
+
+ if (connectTimeout != null) {
+ socketOptions.setConnectTimeoutMillis(connectTimeout.get());
+ }
+
+ if (readTimeout != null) {
Review comment:
same as above
##########
File path:
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1142,7 +1201,9 @@ private static Cluster getCluster(
spec.username(),
spec.password(),
spec.localDc(),
- spec.consistencyLevel());
+ spec.consistencyLevel(),
+ spec.connectTimeout(),
+ null);
Review comment:
pass the read timeout
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]