echauchot commented on code in PR #19680:
URL: https://github.com/apache/flink/pull/19680#discussion_r887784305
##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##########
@@ -17,130 +17,160 @@
package org.apache.flink.batch.connectors.cassandra;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connectors.cassandra.utils.SinkUtils;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.util.Preconditions;
import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
-import com.google.common.base.Strings;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicReference;
/**
- * CassandraOutputFormatBase is the common abstract class for writing into
Apache Cassandra.
+ * CassandraOutputFormatBase is the common abstract class for writing into
Apache Cassandra using
+ * output formats.
*
* @param <OUT> Type of the elements to write.
*/
-public abstract class CassandraOutputFormatBase<OUT> extends
RichOutputFormat<OUT> {
+public abstract class CassandraOutputFormatBase<OUT, V> extends
RichOutputFormat<OUT> {
private static final Logger LOG =
LoggerFactory.getLogger(CassandraOutputFormatBase.class);
- private final String insertQuery;
private final ClusterBuilder builder;
+ private Semaphore semaphore;
+ private Duration maxConcurrentRequestsTimeout =
Duration.ofMillis(Long.MAX_VALUE);
+ private int maxConcurrentRequests = Integer.MAX_VALUE;
private transient Cluster cluster;
- private transient Session session;
- private transient PreparedStatement prepared;
- private transient FutureCallback<ResultSet> callback;
- private transient Throwable exception = null;
-
- public CassandraOutputFormatBase(String insertQuery, ClusterBuilder
builder) {
- Preconditions.checkArgument(
- !Strings.isNullOrEmpty(insertQuery), "Query cannot be null or
empty");
+ protected transient Session session;
+ private transient FutureCallback<V> callback;
+ private AtomicReference<Throwable> throwable;
+
+ public CassandraOutputFormatBase(
+ ClusterBuilder builder,
+ int maxConcurrentRequests,
+ Duration maxConcurrentRequestsTimeout) {
Preconditions.checkNotNull(builder, "Builder cannot be null");
-
- this.insertQuery = insertQuery;
this.builder = builder;
+ Preconditions.checkArgument(
+ maxConcurrentRequests > 0, "Max concurrent requests is
expected to be positive");
+ this.maxConcurrentRequests = maxConcurrentRequests;
+ Preconditions.checkNotNull(
+ maxConcurrentRequestsTimeout, "Max concurrent requests timeout
cannot be null");
+ Preconditions.checkArgument(
+ !maxConcurrentRequestsTimeout.isNegative(),
+ "Max concurrent requests timeout is expected to be positive");
+ this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout;
}
+ /** Configure the connection to Cassandra. */
@Override
public void configure(Configuration parameters) {
this.cluster = builder.getCluster();
}
- /**
- * Opens a Session to Cassandra and initializes the prepared statement.
- *
- * @param taskNumber The number of the parallel instance.
- * @throws IOException Thrown, if the output could not be opened due to an
I/O problem.
- */
+ /** Opens a Session to Cassandra . */
@Override
public void open(int taskNumber, int numTasks) throws IOException {
+ throwable = new AtomicReference<>();
+ this.semaphore = new Semaphore(maxConcurrentRequests);
this.session = cluster.connect();
Review Comment:
I vastly reused `CassandraSinkBaseTest` code but I agree it is a pity to use
a mock only because of Cassandra session/cluster. I'll try to refactor to avoid
mocking and code duplication. I'll propose you something in a isolated commit
for ease of review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]