C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1106203170


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java:
##########
@@ -256,14 +257,25 @@ private void maybeBeginTransaction() {
     private void commitTransaction() {
         log.debug("{} Committing offsets", this);
 
+        long commitTimeoutMs = 
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
         long started = time.milliseconds();
+        long deadline = started + commitTimeoutMs;
 
         // We might have just aborted a transaction, in which case we'll have 
to begin a new one
         // in order to commit offsets
         maybeBeginTransaction();
 
         AtomicReference<Throwable> flushError = new AtomicReference<>();
-        if (offsetWriter.beginFlush()) {
+        boolean shouldFlush = false;
+        try {
+            // Provide a constant timeout value to wait indefinitely, as there 
should not be any concurrent flushes.
+            // This is because commitTransaction is always called on the same 
thread, and should always block until
+            // the flush is complete, or cancel the flush if an error occurs.
+            shouldFlush = offsetWriter.beginFlush(deadline - 
time.milliseconds(), TimeUnit.MILLISECONDS);

Review Comment:
   I agree with the comment about how this method should never be invoked while 
there are in-progress flushes.
   
   Given that, is there any reason to go to the work of calculating a deadline 
and deriving a timeout from it, instead of simply invoking this method with a 
timeout of zero?
   
   We could even add a no-arg variant of `beginFlush` that calls `beginFlush(0, 
TimeUnit.MILLISECONDS)`.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##########
@@ -104,44 +101,29 @@ private boolean flushing() {
         return toFlush != null;
     }
 
-    public boolean waitForBeginFlush(Supplier<Long> timeout, TimeUnit 
timeUnit) throws InterruptedException, TimeoutException {
-        while (true) {
-            Future<Void> inProgressFlush;
-            synchronized (this) {
-                if (flushing()) {
-                    inProgressFlush = latestFlush;
-                } else {
-                    return beginFlush();
-                }
-            }
-            try {
-                inProgressFlush.get(timeout.get(), timeUnit);
-            } catch (ExecutionException e) {
-                // someone else is responsible for handling this error, we 
just want to wait for the flush to be over.
-            }
-        }
-    }
-
     /**
      * Performs the first step of a flush operation, snapshotting the current 
state. This does not
-     * actually initiate the flush with the underlying storage.
+     * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+     * have finished before beginning a new flush.
      *
+     * @param timeout A maximum duration to wait for previous flushes to 
finish before giving up on waiting
+     * @param timeUnit Units of the timeout argument
      * @return true if a flush was initiated, false if no data was available
+     * @throws InterruptedException if this thread was interrupted while 
waiting for the previous flush to complete
+     * @throws TimeoutException if the `timeout` elapses before previous 
flushes are complete.

Review Comment:
   Nit: Javadocs != markdown, should be `{@code timeout}` (without backticks).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to