Copilot commented on code in PR #14816:
URL: https://github.com/apache/iceberg/pull/14816#discussion_r2751211719
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java:
##########
@@ -110,14 +202,60 @@ protected boolean receive(Envelope envelope) {
events.add(readyEvent);
send(events, results.sourceOffsets());
+ }
+
+ @Override
+ protected boolean receive(Envelope envelope) {
+ Event event = envelope.event();
+ PayloadType type = event.payload().type();
- return true;
+ // Only buffer control events that need processing
+ if (type == PayloadType.START_COMMIT
+ || type == PayloadType.COMMIT_COMPLETE
+ || type == PayloadType.COMMIT_TO_TABLE) {
+ controlEventQueue.offer(envelope);
+ LOG.info("Worker {} buffered control event: {}", workerIdentifier, type);
+ return true;
+ }
+
+ return false;
}
@Override
void stop() {
- super.stop();
+ LOG.info("Worker {} stopping.", workerIdentifier);
+ terminateBackGroundPolling();
sinkWriter.close();
+ super.stop();
+ LOG.info("Worker {} stopped.", workerIdentifier);
+ }
+
+ private void terminateBackGroundPolling() {
+ running.set(false);
+ pollingExecutor.shutdownNow();
+ try {
+ if (!pollingExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
+ LOG.warn(
+ "Polling thread did not terminate in time on worker {}, forcing
shutdown",
+ workerIdentifier);
+ throw new ConnectException(
+ String.format(
+ "Background polling thread of worker %s did not terminated
gracefully.",
Review Comment:
Corrected grammar from 'did not terminated' to 'did not terminate'.
```suggestion
"Background polling thread of worker %s did not terminate
gracefully.",
```
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestWorker.java:
##########
@@ -111,6 +116,314 @@ public void testSave() {
assertThat(dataComplete.commitId()).isEqualTo(commitId);
assertThat(dataComplete.assignments()).hasSize(1);
assertThat(dataComplete.assignments().get(0).offset()).isEqualTo(1L);
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testBackgroundPollingBuffersEvents() throws InterruptedException
{
+ when(config.catalogName()).thenReturn("catalog");
+ when(config.controlPollIntervalMs()).thenReturn(50);
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+ when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+ when(sinkWriter.completeWrite())
+ .thenReturn(new SinkWriterResult(ImmutableList.of(),
ImmutableMap.of()));
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Add multiple events to consumer
+ UUID commitId1 = UUID.randomUUID();
+ Event event1 = new Event(config.connectGroupId(), new
StartCommit(commitId1));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(event1)));
+
+ UUID commitId2 = UUID.randomUUID();
+ Event event2 = new Event(config.connectGroupId(), new
StartCommit(commitId2));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key",
AvroUtil.encode(event2)));
+
+ // Wait for background polling to buffer events
+ Thread.sleep(300);
+
+ // Process should handle both buffered events
+ worker.process();
+
+ // Should have 2 DATA_COMPLETE events (one per commit)
+ assertThat(producer.history().size()).isGreaterThanOrEqualTo(2);
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testWorkerIgnoresNonRelevantEvents() throws InterruptedException
{
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ when(context.assignment()).thenReturn(ImmutableSet.of());
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Add events with different group IDs (should be ignored)
+ UUID commitId = UUID.randomUUID();
+ Event event = new Event("different-group-id", new StartCommit(commitId));
+ consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(event)));
+
+ Thread.sleep(200);
+ worker.process();
+
+ // Should not produce any events since the group ID doesn't match
+ assertThat(producer.history()).isEmpty();
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testWorkerGracefulShutdown() throws InterruptedException {
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ when(context.assignment()).thenReturn(ImmutableSet.of());
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Stop worker immediately
+ worker.stop();
+
+ // Should complete without exceptions
+ assertThat(producer.history()).isEmpty();
+ }
+ }
+
+ @Test
+ public void testWorkerProcessesMultipleEventTypes() throws
InterruptedException {
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+ when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+ when(sinkWriter.completeWrite())
+ .thenReturn(new SinkWriterResult(ImmutableList.of(),
ImmutableMap.of()));
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ UUID commitId = UUID.randomUUID();
+
+ // Add START_COMMIT event
+ Event startCommit = new Event(config.connectGroupId(), new
StartCommit(commitId));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(startCommit)));
+
+ // Add COMMIT_COMPLETE event
+ Event commitComplete =
+ new Event(config.connectGroupId(), new CommitComplete(commitId,
EventTestUtil.now()));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key",
AvroUtil.encode(commitComplete)));
+
+ // Add COMMIT_TO_TABLE event
+ Event commitToTable =
+ new Event(
+ config.connectGroupId(),
+ new CommitToTable(
+ commitId,
+ TableReference.of("catalog",
TableIdentifier.parse(TABLE_NAME)),
+ 1L,
+ EventTestUtil.now()));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 3, "key",
AvroUtil.encode(commitToTable)));
+
+ Thread.sleep(300);
Review Comment:
Using `Thread.sleep()` in tests creates timing-dependent behavior that can
lead to flaky tests. Consider using a polling mechanism with a timeout to
verify events have been buffered.
##########
kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/Worker.java:
##########
@@ -59,19 +76,96 @@ class Worker extends Channel {
this.config = config;
this.context = context;
this.sinkWriter = sinkWriter;
+
+ this.workerIdentifier = config.connectorName() + "-" + config.taskId();
+ // Initialize async processing components
+ this.controlEventQueue = new ConcurrentLinkedQueue<>();
+ this.running = new AtomicBoolean(false);
+ this.pollInterval = Duration.ofMillis(config.controlPollIntervalMs());
+ this.pollingExecutor =
+ Executors.newSingleThreadExecutor(
+ r -> {
+ Thread thread =
+ new Thread(
+ r, "worker-control-poller-" + config.connectorName() +
"-" + config.taskId());
+ thread.setDaemon(true); // Not daemon to ensure graceful shutdown
Review Comment:
The comment states 'Not daemon to ensure graceful shutdown' but the code
sets `setDaemon(true)`. The comment should say 'Daemon thread for automatic
cleanup' or the code should use `setDaemon(false)` if graceful shutdown is
required.
```suggestion
thread.setDaemon(true); // Daemon thread for automatic cleanup
```
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestWorker.java:
##########
@@ -111,6 +116,314 @@ public void testSave() {
assertThat(dataComplete.commitId()).isEqualTo(commitId);
assertThat(dataComplete.assignments()).hasSize(1);
assertThat(dataComplete.assignments().get(0).offset()).isEqualTo(1L);
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testBackgroundPollingBuffersEvents() throws InterruptedException
{
+ when(config.catalogName()).thenReturn("catalog");
+ when(config.controlPollIntervalMs()).thenReturn(50);
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+ when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+ when(sinkWriter.completeWrite())
+ .thenReturn(new SinkWriterResult(ImmutableList.of(),
ImmutableMap.of()));
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Add multiple events to consumer
+ UUID commitId1 = UUID.randomUUID();
+ Event event1 = new Event(config.connectGroupId(), new
StartCommit(commitId1));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(event1)));
+
+ UUID commitId2 = UUID.randomUUID();
+ Event event2 = new Event(config.connectGroupId(), new
StartCommit(commitId2));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key",
AvroUtil.encode(event2)));
+
+ // Wait for background polling to buffer events
+ Thread.sleep(300);
Review Comment:
Using `Thread.sleep()` in tests creates timing-dependent behavior that can
lead to flaky tests. Consider using a polling mechanism with a timeout to
verify events have been buffered.
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestWorker.java:
##########
@@ -111,6 +116,314 @@ public void testSave() {
assertThat(dataComplete.commitId()).isEqualTo(commitId);
assertThat(dataComplete.assignments()).hasSize(1);
assertThat(dataComplete.assignments().get(0).offset()).isEqualTo(1L);
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testBackgroundPollingBuffersEvents() throws InterruptedException
{
+ when(config.catalogName()).thenReturn("catalog");
+ when(config.controlPollIntervalMs()).thenReturn(50);
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+ when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+ when(sinkWriter.completeWrite())
+ .thenReturn(new SinkWriterResult(ImmutableList.of(),
ImmutableMap.of()));
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Add multiple events to consumer
+ UUID commitId1 = UUID.randomUUID();
+ Event event1 = new Event(config.connectGroupId(), new
StartCommit(commitId1));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(event1)));
+
+ UUID commitId2 = UUID.randomUUID();
+ Event event2 = new Event(config.connectGroupId(), new
StartCommit(commitId2));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key",
AvroUtil.encode(event2)));
+
+ // Wait for background polling to buffer events
+ Thread.sleep(300);
+
+ // Process should handle both buffered events
+ worker.process();
+
+ // Should have 2 DATA_COMPLETE events (one per commit)
+ assertThat(producer.history().size()).isGreaterThanOrEqualTo(2);
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testWorkerIgnoresNonRelevantEvents() throws InterruptedException
{
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ when(context.assignment()).thenReturn(ImmutableSet.of());
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Add events with different group IDs (should be ignored)
+ UUID commitId = UUID.randomUUID();
+ Event event = new Event("different-group-id", new StartCommit(commitId));
+ consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(event)));
+
+ Thread.sleep(200);
+ worker.process();
+
+ // Should not produce any events since the group ID doesn't match
+ assertThat(producer.history()).isEmpty();
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testWorkerGracefulShutdown() throws InterruptedException {
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ when(context.assignment()).thenReturn(ImmutableSet.of());
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Stop worker immediately
+ worker.stop();
+
+ // Should complete without exceptions
+ assertThat(producer.history()).isEmpty();
+ }
+ }
+
+ @Test
+ public void testWorkerProcessesMultipleEventTypes() throws
InterruptedException {
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+ when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+ when(sinkWriter.completeWrite())
+ .thenReturn(new SinkWriterResult(ImmutableList.of(),
ImmutableMap.of()));
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ UUID commitId = UUID.randomUUID();
+
+ // Add START_COMMIT event
+ Event startCommit = new Event(config.connectGroupId(), new
StartCommit(commitId));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(startCommit)));
+
+ // Add COMMIT_COMPLETE event
+ Event commitComplete =
+ new Event(config.connectGroupId(), new CommitComplete(commitId,
EventTestUtil.now()));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key",
AvroUtil.encode(commitComplete)));
+
+ // Add COMMIT_TO_TABLE event
+ Event commitToTable =
+ new Event(
+ config.connectGroupId(),
+ new CommitToTable(
+ commitId,
+ TableReference.of("catalog",
TableIdentifier.parse(TABLE_NAME)),
+ 1L,
+ EventTestUtil.now()));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 3, "key",
AvroUtil.encode(commitToTable)));
+
+ Thread.sleep(300);
+
+ // All events should be buffered
+ worker.process();
+
+ // Should have processed the START_COMMIT event
+ assertThat(producer.history()).isNotEmpty();
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testWorkerHandlesEmptyQueue() throws InterruptedException {
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ when(context.assignment()).thenReturn(ImmutableSet.of());
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Call process multiple times with no events
+ worker.process();
+ worker.process();
+ worker.process();
+
+ assertThat(producer.history()).isEmpty();
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testWorkerWithCustomPollInterval() throws InterruptedException {
+ when(config.catalogName()).thenReturn("catalog");
+ when(config.controlPollIntervalMs()).thenReturn(1000); // 1 second
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+ when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+ when(sinkWriter.completeWrite())
+ .thenReturn(new SinkWriterResult(ImmutableList.of(),
ImmutableMap.of()));
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ UUID commitId = UUID.randomUUID();
+ Event event = new Event(config.connectGroupId(), new
StartCommit(commitId));
+ consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(event)));
+
+ // Wait for longer than poll interval to ensure event is buffered
+ Thread.sleep(1500);
+
+ worker.process();
+
+ assertThat(producer.history()).isNotEmpty();
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testWorkerMultipleStartCommits() throws InterruptedException {
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+ when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+ IcebergWriterResult writeResult1 =
+ new IcebergWriterResult(
+ TableIdentifier.parse(TABLE_NAME),
+ ImmutableList.of(EventTestUtil.createDataFile()),
+ ImmutableList.of(),
+ StructType.of());
+
+ IcebergWriterResult writeResult2 =
+ new IcebergWriterResult(
+ TableIdentifier.parse(TABLE_NAME),
+ ImmutableList.of(EventTestUtil.createDataFile()),
+ ImmutableList.of(),
+ StructType.of());
+
+ Map<TopicPartition, Offset> offsets =
+ ImmutableMap.of(topicPartition, new Offset(1L, EventTestUtil.now()));
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+ when(sinkWriter.completeWrite())
+ .thenReturn(new SinkWriterResult(ImmutableList.of(writeResult1),
offsets))
+ .thenReturn(new SinkWriterResult(ImmutableList.of(writeResult2),
offsets));
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Add multiple START_COMMIT events
+ UUID commitId1 = UUID.randomUUID();
+ Event event1 = new Event(config.connectGroupId(), new
StartCommit(commitId1));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(event1)));
+
+ UUID commitId2 = UUID.randomUUID();
+ Event event2 = new Event(config.connectGroupId(), new
StartCommit(commitId2));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key",
AvroUtil.encode(event2)));
+
+ Thread.sleep(300);
Review Comment:
Using `Thread.sleep()` in tests creates timing-dependent behavior that can
lead to flaky tests. Consider using a polling mechanism with a timeout to
verify events have been buffered.
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestWorker.java:
##########
@@ -96,6 +99,8 @@ public void testSave() {
byte[] bytes = AvroUtil.encode(commitRequest);
consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
bytes));
+ // Give background thread time to poll and buffer the event
+ Thread.sleep(500);
Review Comment:
Using `Thread.sleep()` in tests creates timing-dependent behavior that can
lead to flaky tests. Consider using a polling mechanism with a timeout to
verify the event has been buffered, or use test utilities like Awaitility to
wait for the expected condition.
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestWorker.java:
##########
@@ -111,6 +116,314 @@ public void testSave() {
assertThat(dataComplete.commitId()).isEqualTo(commitId);
assertThat(dataComplete.assignments()).hasSize(1);
assertThat(dataComplete.assignments().get(0).offset()).isEqualTo(1L);
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testBackgroundPollingBuffersEvents() throws InterruptedException
{
+ when(config.catalogName()).thenReturn("catalog");
+ when(config.controlPollIntervalMs()).thenReturn(50);
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+ when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+ when(sinkWriter.completeWrite())
+ .thenReturn(new SinkWriterResult(ImmutableList.of(),
ImmutableMap.of()));
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Add multiple events to consumer
+ UUID commitId1 = UUID.randomUUID();
+ Event event1 = new Event(config.connectGroupId(), new
StartCommit(commitId1));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(event1)));
+
+ UUID commitId2 = UUID.randomUUID();
+ Event event2 = new Event(config.connectGroupId(), new
StartCommit(commitId2));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key",
AvroUtil.encode(event2)));
+
+ // Wait for background polling to buffer events
+ Thread.sleep(300);
+
+ // Process should handle both buffered events
+ worker.process();
+
+ // Should have 2 DATA_COMPLETE events (one per commit)
+ assertThat(producer.history().size()).isGreaterThanOrEqualTo(2);
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testWorkerIgnoresNonRelevantEvents() throws InterruptedException
{
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ when(context.assignment()).thenReturn(ImmutableSet.of());
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Add events with different group IDs (should be ignored)
+ UUID commitId = UUID.randomUUID();
+ Event event = new Event("different-group-id", new StartCommit(commitId));
+ consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(event)));
+
+ Thread.sleep(200);
Review Comment:
Using `Thread.sleep()` in tests creates timing-dependent behavior that can
lead to flaky tests. Consider using a polling mechanism with a timeout to
verify the queue remains empty.
##########
kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestWorker.java:
##########
@@ -111,6 +116,314 @@ public void testSave() {
assertThat(dataComplete.commitId()).isEqualTo(commitId);
assertThat(dataComplete.assignments()).hasSize(1);
assertThat(dataComplete.assignments().get(0).offset()).isEqualTo(1L);
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testBackgroundPollingBuffersEvents() throws InterruptedException
{
+ when(config.catalogName()).thenReturn("catalog");
+ when(config.controlPollIntervalMs()).thenReturn(50);
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+ when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+ when(sinkWriter.completeWrite())
+ .thenReturn(new SinkWriterResult(ImmutableList.of(),
ImmutableMap.of()));
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Add multiple events to consumer
+ UUID commitId1 = UUID.randomUUID();
+ Event event1 = new Event(config.connectGroupId(), new
StartCommit(commitId1));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(event1)));
+
+ UUID commitId2 = UUID.randomUUID();
+ Event event2 = new Event(config.connectGroupId(), new
StartCommit(commitId2));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key",
AvroUtil.encode(event2)));
+
+ // Wait for background polling to buffer events
+ Thread.sleep(300);
+
+ // Process should handle both buffered events
+ worker.process();
+
+ // Should have 2 DATA_COMPLETE events (one per commit)
+ assertThat(producer.history().size()).isGreaterThanOrEqualTo(2);
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testWorkerIgnoresNonRelevantEvents() throws InterruptedException
{
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ when(context.assignment()).thenReturn(ImmutableSet.of());
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Add events with different group IDs (should be ignored)
+ UUID commitId = UUID.randomUUID();
+ Event event = new Event("different-group-id", new StartCommit(commitId));
+ consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(event)));
+
+ Thread.sleep(200);
+ worker.process();
+
+ // Should not produce any events since the group ID doesn't match
+ assertThat(producer.history()).isEmpty();
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testWorkerGracefulShutdown() throws InterruptedException {
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ when(context.assignment()).thenReturn(ImmutableSet.of());
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Stop worker immediately
+ worker.stop();
+
+ // Should complete without exceptions
+ assertThat(producer.history()).isEmpty();
+ }
+ }
+
+ @Test
+ public void testWorkerProcessesMultipleEventTypes() throws
InterruptedException {
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+ when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+ when(sinkWriter.completeWrite())
+ .thenReturn(new SinkWriterResult(ImmutableList.of(),
ImmutableMap.of()));
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ UUID commitId = UUID.randomUUID();
+
+ // Add START_COMMIT event
+ Event startCommit = new Event(config.connectGroupId(), new
StartCommit(commitId));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(startCommit)));
+
+ // Add COMMIT_COMPLETE event
+ Event commitComplete =
+ new Event(config.connectGroupId(), new CommitComplete(commitId,
EventTestUtil.now()));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 2, "key",
AvroUtil.encode(commitComplete)));
+
+ // Add COMMIT_TO_TABLE event
+ Event commitToTable =
+ new Event(
+ config.connectGroupId(),
+ new CommitToTable(
+ commitId,
+ TableReference.of("catalog",
TableIdentifier.parse(TABLE_NAME)),
+ 1L,
+ EventTestUtil.now()));
+ consumer.addRecord(
+ new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 3, "key",
AvroUtil.encode(commitToTable)));
+
+ Thread.sleep(300);
+
+ // All events should be buffered
+ worker.process();
+
+ // Should have processed the START_COMMIT event
+ assertThat(producer.history()).isNotEmpty();
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testWorkerHandlesEmptyQueue() throws InterruptedException {
+ when(config.catalogName()).thenReturn("catalog");
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ when(context.assignment()).thenReturn(ImmutableSet.of());
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ // Call process multiple times with no events
+ worker.process();
+ worker.process();
+ worker.process();
+
+ assertThat(producer.history()).isEmpty();
+
+ worker.stop();
+ }
+ }
+
+ @Test
+ public void testWorkerWithCustomPollInterval() throws InterruptedException {
+ when(config.catalogName()).thenReturn("catalog");
+ when(config.controlPollIntervalMs()).thenReturn(1000); // 1 second
+
+ try (MockedStatic<KafkaUtils> mockKafkaUtils =
mockStatic(KafkaUtils.class)) {
+ ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ mockKafkaUtils
+ .when(() -> KafkaUtils.consumerGroupMetadata(any()))
+ .thenReturn(consumerGroupMetadata);
+
+ SinkTaskContext context = mock(SinkTaskContext.class);
+ TopicPartition topicPartition = new TopicPartition(SRC_TOPIC_NAME, 0);
+ when(context.assignment()).thenReturn(ImmutableSet.of(topicPartition));
+
+ SinkWriter sinkWriter = mock(SinkWriter.class);
+ when(sinkWriter.completeWrite())
+ .thenReturn(new SinkWriterResult(ImmutableList.of(),
ImmutableMap.of()));
+
+ initConsumer();
+
+ Worker worker = new Worker(config, clientFactory, sinkWriter, context);
+ worker.start();
+
+ UUID commitId = UUID.randomUUID();
+ Event event = new Event(config.connectGroupId(), new
StartCommit(commitId));
+ consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key",
AvroUtil.encode(event)));
+
+ // Wait for longer than poll interval to ensure event is buffered
+ Thread.sleep(1500);
Review Comment:
Using `Thread.sleep()` in tests creates timing-dependent behavior that can
lead to flaky tests. Consider using a polling mechanism with a timeout to
verify the event has been buffered.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]