harangozop commented on code in PR #24654: URL: https://github.com/apache/pulsar/pull/24654#discussion_r2336177014
########## pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java: ########## @@ -162,38 +165,103 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws sourceTask.initialize(sourceTaskContext); sourceTask.start(taskConfig); } + private void onOffsetsFlushed(Throwable error, CompletableFuture<Void> snapshotFlushFuture) { + if (error != null) { + log.error("Failed to flush offsets to storage: ", error); + offsetWriter.cancelFlush(); + snapshotFlushFuture.completeExceptionally(new Exception("No Offsets Added Error", error)); + return; + } + try { + sourceTask.commit(); + if (log.isDebugEnabled()) { + log.debug("Finished flushing offsets to storage"); + } + snapshotFlushFuture.complete(null); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.warn("Flush interrupted, cancelling", ie); + offsetWriter.cancelFlush(); + snapshotFlushFuture.completeExceptionally(new Exception("Failed to commit offsets", ie)); + } catch (Throwable t) { + log.warn("Flush failed, cancelling", t); + offsetWriter.cancelFlush(); + snapshotFlushFuture.completeExceptionally(new Exception("Failed to commit offsets", t)); + } + } + + private void triggerOffsetsFlushIfNeeded() { + final CompletableFuture<Void> snapshotFlushFuture = flushFutureRef.get(); + // Only flush when we have a batch in flight, nothing outstanding, and a pending future + if (snapshotFlushFuture == null || snapshotFlushFuture.isDone() || outstandingRecords.get() != 0) { + return; + } + if (!flushing.compareAndSet(false, true)) { + return; // someone else is flushing + } + try { + if (offsetWriter.beginFlush()) { + offsetWriter.doFlush((error, ignored) -> { + try { + onOffsetsFlushed(error, snapshotFlushFuture); + } finally { + flushing.set(false); + } + }); + } else { + try { + onOffsetsFlushed(null, snapshotFlushFuture); + } finally { + flushing.set(false); + } + } + } catch (ConnectException alreadyFlushing) { + // Another thread initiated the flush; let their callback complete the future. + // Keep 'flushing' = true until read() finalizes the batch. + } catch (Exception t) { + try { + onOffsetsFlushed(t, snapshotFlushFuture); + } finally { + flushing.set(false); + } + } + } @Override public synchronized Record<T> read() throws Exception { while (true) { if (currentBatch == null) { - flushFuture = new CompletableFuture<>(); List<SourceRecord> recordList = sourceTask.poll(); if (recordList == null || recordList.isEmpty()) { continue; } outstandingRecords.addAndGet(recordList.size()); currentBatch = recordList.iterator(); + + final CompletableFuture<Void> newFuture = new CompletableFuture<>(); + flushFutureRef.set(newFuture); } if (currentBatch.hasNext()) { AbstractKafkaSourceRecord<T> processRecord = processSourceRecord(currentBatch.next()); if (processRecord == null || processRecord.isEmpty()) { outstandingRecords.decrementAndGet(); - continue; + triggerOffsetsFlushIfNeeded(); } else { return processRecord; } } else { - // there is no records any more, then waiting for the batch to complete writing - // to sink and the offsets are committed as well, then do next round read. + final CompletableFuture<Void> snapshotFlushFuture = flushFutureRef.get(); try { - flushFuture.get(); + if (snapshotFlushFuture != null) { + snapshotFlushFuture.get(); + } } catch (ExecutionException ex) { - // log the error, continue execution log.error("execution exception while get flushFuture", ex); throw new Exception("Flush failed", ex.getCause()); } finally { - flushFuture = null; + flushing.set(false); + // Clear only if this is still the current batch future + flushFutureRef.compareAndSet(snapshotFlushFuture, null); Review Comment: No, when calling `flushFutureRef.get()` it already blocks until the flush is ready, so it's safe to set flushing to false unconditionally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org