echauchot commented on code in PR #19680:
URL: https://github.com/apache/flink/pull/19680#discussion_r874518997


##########
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java:
##########
@@ -17,130 +17,191 @@
 
 package org.apache.flink.batch.connectors.cassandra;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connectors.cassandra.utils.SinkUtils;
 import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
 import org.apache.flink.util.Preconditions;
 
 import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.PreparedStatement;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.ResultSetFuture;
 import com.datastax.driver.core.Session;
-import com.google.common.base.Strings;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * CassandraOutputFormatBase is the common abstract class for writing into 
Apache Cassandra.
+ * CassandraOutputFormatBase is the common abstract class for writing into 
Apache Cassandra using
+ * output formats.
  *
  * @param <OUT> Type of the elements to write.
  */
-public abstract class CassandraOutputFormatBase<OUT> extends 
RichOutputFormat<OUT> {
+public abstract class CassandraOutputFormatBase<OUT, V> extends 
RichOutputFormat<OUT> {
     private static final Logger LOG = 
LoggerFactory.getLogger(CassandraOutputFormatBase.class);
 
-    private final String insertQuery;
     private final ClusterBuilder builder;
+    private Semaphore semaphore;
+    private Duration maxConcurrentRequestsTimeout = 
Duration.ofMillis(Long.MAX_VALUE);
+    private int maxConcurrentRequests = Integer.MAX_VALUE;
 
     private transient Cluster cluster;
-    private transient Session session;
-    private transient PreparedStatement prepared;
-    private transient FutureCallback<ResultSet> callback;
-    private transient Throwable exception = null;
+    protected transient Session session;
+    private transient FutureCallback<V> callback;
+    private AtomicReference<Throwable> throwable;
 
-    public CassandraOutputFormatBase(String insertQuery, ClusterBuilder 
builder) {
-        Preconditions.checkArgument(
-                !Strings.isNullOrEmpty(insertQuery), "Query cannot be null or 
empty");
+    public CassandraOutputFormatBase(ClusterBuilder builder) {
         Preconditions.checkNotNull(builder, "Builder cannot be null");
-
-        this.insertQuery = insertQuery;
         this.builder = builder;
     }
 
+    /**
+     * Sets the maximum allowed number of concurrent requests for this output 
format.
+     *
+     * @param maxConcurrentRequestsTimeout timeout duration when acquiring a 
permit to execute
+     */
+    public void setMaxConcurrentRequestsTimeout(Duration 
maxConcurrentRequestsTimeout) {
+        Preconditions.checkNotNull(
+                maxConcurrentRequestsTimeout, "Max concurrent requests timeout 
cannot be null");
+        Preconditions.checkArgument(
+                !maxConcurrentRequestsTimeout.isNegative(),
+                "Max concurrent requests timeout is expected to be positive");
+
+        this.maxConcurrentRequestsTimeout = maxConcurrentRequestsTimeout;
+    }
+
+    /**
+     * Sets the maximum allowed number of concurrent requests for this output 
format.
+     *
+     * @param maxConcurrentRequests maximum number of concurrent requests 
allowed
+     */
+    public void setMaxConcurrentRequests(int maxConcurrentRequests) {
+        Preconditions.checkArgument(
+                maxConcurrentRequests > 0, "Max concurrent requests is 
expected to be positive");
+        this.maxConcurrentRequests = maxConcurrentRequests;
+    }
+
+    /**
+     * Configure the connection to Cassandra.
+     *
+     * @param parameters The configuration with all parameters.
+     */
     @Override
     public void configure(Configuration parameters) {
         this.cluster = builder.getCluster();
     }
 
     /**
-     * Opens a Session to Cassandra and initializes the prepared statement.
+     * Opens a Session to Cassandra .
      *
      * @param taskNumber The number of the parallel instance.
-     * @throws IOException Thrown, if the output could not be opened due to an 
I/O problem.
+     * @throws IOException Thrown, if the output format could not be opened 
due to an I/O problem.
      */
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
+        throwable = new AtomicReference<>();
+        this.semaphore = new Semaphore(maxConcurrentRequests);
         this.session = cluster.connect();
-        this.prepared = session.prepare(insertQuery);
         this.callback =
-                new FutureCallback<ResultSet>() {
+                new FutureCallback<V>() {
                     @Override
-                    public void onSuccess(ResultSet ignored) {
-                        onWriteSuccess(ignored);
+                    public void onSuccess(V ignored) {
+                        semaphore.release();
                     }
 
                     @Override
                     public void onFailure(Throwable t) {
-                        onWriteFailure(t);
+                        throwable.compareAndSet(null, t);
+                        LOG.error("Error while writing value.", t);
+                        semaphore.release();
                     }
                 };
     }
 
-    @Override
-    public void writeRecord(OUT record) throws IOException {
-        if (exception != null) {
-            throw new IOException("write record failed", exception);
-        }
-
-        Object[] fields = extractFields(record);
-        ResultSetFuture result = session.executeAsync(prepared.bind(fields));
-        Futures.addCallback(result, callback);
+    private void flush() throws InterruptedException, TimeoutException {
+        tryAcquire(maxConcurrentRequests);
+        semaphore.release(maxConcurrentRequests);
     }
 
-    protected abstract Object[] extractFields(OUT record);
+    private void tryAcquire(int permits) throws InterruptedException, 
TimeoutException {
+        SinkUtils.tryAcquire(
+                permits, maxConcurrentRequests, maxConcurrentRequestsTimeout, 
semaphore);
+    }
 
     /**
-     * Callback that is invoked after a record is written to Cassandra 
successfully.
-     *
-     * <p>Subclass can override to provide its own logic.
+     * Writes a record to Cassandra.
      *
-     * @param ignored the result.
+     * @param record The records to add to the output.
+     * @throws Exception Thrown, if the record could not be written due to an 
I/O problem or a
+     *     timeout.
      */
-    protected void onWriteSuccess(ResultSet ignored) {}
+    @Override
+    public void writeRecord(OUT record) throws Exception {
+        checkAsyncErrors();
+        tryAcquire(1);
+        final ListenableFuture<V> result;
+        try {
+            result = send(record);
+        } catch (Throwable e) {
+            semaphore.release();
+            throw e;
+        }
+        Futures.addCallback(result, callback);
+    }
+
+    public abstract ListenableFuture<V> send(OUT value);

Review Comment:
   yes, thanks for the catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to