Copilot commented on code in PR #24654: URL: https://github.com/apache/pulsar/pull/24654#discussion_r2324268637
########## pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/AbstractKafkaConnectSource.java: ########## @@ -162,38 +165,103 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws sourceTask.initialize(sourceTaskContext); sourceTask.start(taskConfig); } + private void onOffsetsFlushed(Throwable error, CompletableFuture<Void> snapshotFlushFuture) { + if (error != null) { + log.error("Failed to flush offsets to storage: ", error); + offsetWriter.cancelFlush(); + snapshotFlushFuture.completeExceptionally(new Exception("No Offsets Added Error", error)); + return; + } + try { + sourceTask.commit(); + if (log.isDebugEnabled()) { + log.debug("Finished flushing offsets to storage"); + } + snapshotFlushFuture.complete(null); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.warn("Flush interrupted, cancelling", ie); + offsetWriter.cancelFlush(); + snapshotFlushFuture.completeExceptionally(new Exception("Failed to commit offsets", ie)); + } catch (Throwable t) { + log.warn("Flush failed, cancelling", t); + offsetWriter.cancelFlush(); + snapshotFlushFuture.completeExceptionally(new Exception("Failed to commit offsets", t)); + } + } + + private void triggerOffsetsFlushIfNeeded() { + final CompletableFuture<Void> snapshotFlushFuture = flushFutureRef.get(); + // Only flush when we have a batch in flight, nothing outstanding, and a pending future + if (snapshotFlushFuture == null || snapshotFlushFuture.isDone() || outstandingRecords.get() != 0) { + return; + } + if (!flushing.compareAndSet(false, true)) { + return; // someone else is flushing + } + try { + if (offsetWriter.beginFlush()) { + offsetWriter.doFlush((error, ignored) -> { + try { + onOffsetsFlushed(error, snapshotFlushFuture); + } finally { + flushing.set(false); + } + }); + } else { + try { + onOffsetsFlushed(null, snapshotFlushFuture); + } finally { + flushing.set(false); + } + } + } catch (ConnectException alreadyFlushing) { + // Another thread initiated the flush; let their callback complete the future. + // Keep 'flushing' = true until read() finalizes the batch. + } catch (Exception t) { + try { + onOffsetsFlushed(t, snapshotFlushFuture); + } finally { + flushing.set(false); + } + } + } @Override public synchronized Record<T> read() throws Exception { while (true) { if (currentBatch == null) { - flushFuture = new CompletableFuture<>(); List<SourceRecord> recordList = sourceTask.poll(); if (recordList == null || recordList.isEmpty()) { continue; } outstandingRecords.addAndGet(recordList.size()); currentBatch = recordList.iterator(); + + final CompletableFuture<Void> newFuture = new CompletableFuture<>(); + flushFutureRef.set(newFuture); } if (currentBatch.hasNext()) { AbstractKafkaSourceRecord<T> processRecord = processSourceRecord(currentBatch.next()); if (processRecord == null || processRecord.isEmpty()) { outstandingRecords.decrementAndGet(); - continue; + triggerOffsetsFlushIfNeeded(); Review Comment: The `continue` statement was removed but the logic still needs to skip to the next iteration when a record is filtered out. Add `continue;` after the flush trigger to maintain the original loop behavior. ```suggestion triggerOffsetsFlushIfNeeded(); continue; ``` ########## pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java: ########## @@ -96,6 +103,182 @@ public void testOpenAndReadConnectorConfig() throws Exception { testOpenAndReadTask(config); } + + @Test(timeOut = 30000) + public void testFlushWhenAllMessagesFilteredWithoutBlocking() throws Exception { + + Map<String, Object> config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + + config.put("transforms", "Filter"); + config.put("transforms.Filter.type", "org.apache.kafka.connect.transforms.Filter"); + config.put("transforms.Filter.predicate", "DropMeTopic"); + + config.put("predicates", "DropMeTopic"); + config.put("predicates.DropMeTopic.type", "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"); + config.put("predicates.DropMeTopic.pattern", ".*my-property/my-ns/kafka-connect-source.*"); + + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + OffsetStorageWriter original = kafkaConnectSource.getOffsetWriter(); + OffsetStorageWriter spyWriter = org.mockito.Mockito.spy(original); + java.lang.reflect.Field f = AbstractKafkaConnectSource.class.getDeclaredField("offsetWriter"); + f.setAccessible(true); + f.set(kafkaConnectSource, spyWriter); + + try (OutputStream os = Files.newOutputStream(tempFile.toPath())) { + os.write("first\n".getBytes()); + os.flush(); + os.write("second\n".getBytes()); + os.flush(); + } + + Thread t = new Thread(() -> { + try { + kafkaConnectSource.read(); + } catch (Exception ignored) { + // Ignore, it just needs the loop running through a batch where every message is filtered out Review Comment: Catching and ignoring all exceptions can hide important test failures. Consider catching more specific exceptions or at least logging the exception to aid in debugging test failures. ```suggestion } catch (Exception e) { log.error("Exception in read thread", e); ``` ########## pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSourceTest.java: ########## @@ -96,6 +103,182 @@ public void testOpenAndReadConnectorConfig() throws Exception { testOpenAndReadTask(config); } + + @Test(timeOut = 30000) + public void testFlushWhenAllMessagesFilteredWithoutBlocking() throws Exception { + + Map<String, Object> config = getConfig(); + config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask"); + + config.put("transforms", "Filter"); + config.put("transforms.Filter.type", "org.apache.kafka.connect.transforms.Filter"); + config.put("transforms.Filter.predicate", "DropMeTopic"); + + config.put("predicates", "DropMeTopic"); + config.put("predicates.DropMeTopic.type", "org.apache.kafka.connect.transforms.predicates.TopicNameMatches"); + config.put("predicates.DropMeTopic.pattern", ".*my-property/my-ns/kafka-connect-source.*"); + + kafkaConnectSource = new KafkaConnectSource(); + kafkaConnectSource.open(config, context); + + OffsetStorageWriter original = kafkaConnectSource.getOffsetWriter(); + OffsetStorageWriter spyWriter = org.mockito.Mockito.spy(original); + java.lang.reflect.Field f = AbstractKafkaConnectSource.class.getDeclaredField("offsetWriter"); + f.setAccessible(true); + f.set(kafkaConnectSource, spyWriter); Review Comment: This reflection-based field access pattern is repeated in multiple tests. Extract this spy setup logic into a private helper method to reduce code duplication and improve maintainability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org