C0urante commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1106203170
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ExactlyOnceWorkerSourceTask.java: ########## @@ -256,14 +257,25 @@ private void maybeBeginTransaction() { private void commitTransaction() { log.debug("{} Committing offsets", this); + long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG); long started = time.milliseconds(); + long deadline = started + commitTimeoutMs; // We might have just aborted a transaction, in which case we'll have to begin a new one // in order to commit offsets maybeBeginTransaction(); AtomicReference<Throwable> flushError = new AtomicReference<>(); - if (offsetWriter.beginFlush()) { + boolean shouldFlush = false; + try { + // Provide a constant timeout value to wait indefinitely, as there should not be any concurrent flushes. + // This is because commitTransaction is always called on the same thread, and should always block until + // the flush is complete, or cancel the flush if an error occurs. + shouldFlush = offsetWriter.beginFlush(deadline - time.milliseconds(), TimeUnit.MILLISECONDS); Review Comment: I agree with the comment about how this method should never be invoked while there are in-progress flushes. Given that, is there any reason to go to the work of calculating a deadline and deriving a timeout from it, instead of simply invoking this method with a timeout of zero? We could even add a no-arg variant of `beginFlush` that calls `beginFlush(0, TimeUnit.MILLISECONDS)`. ########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java: ########## @@ -104,44 +101,29 @@ private boolean flushing() { return toFlush != null; } - public boolean waitForBeginFlush(Supplier<Long> timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { - while (true) { - Future<Void> inProgressFlush; - synchronized (this) { - if (flushing()) { - inProgressFlush = latestFlush; - } else { - return beginFlush(); - } - } - try { - inProgressFlush.get(timeout.get(), timeUnit); - } catch (ExecutionException e) { - // someone else is responsible for handling this error, we just want to wait for the flush to be over. - } - } - } - /** * Performs the first step of a flush operation, snapshotting the current state. This does not - * actually initiate the flush with the underlying storage. + * actually initiate the flush with the underlying storage. Ensures that any previous flush operations + * have finished before beginning a new flush. * + * @param timeout A maximum duration to wait for previous flushes to finish before giving up on waiting + * @param timeUnit Units of the timeout argument * @return true if a flush was initiated, false if no data was available + * @throws InterruptedException if this thread was interrupted while waiting for the previous flush to complete + * @throws TimeoutException if the `timeout` elapses before previous flushes are complete. Review Comment: Nit: Javadocs != markdown, should be `{@code timeout}` (without backticks). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org