lokeshj1703 commented on code in PR #17975:
URL: https://github.com/apache/hudi/pull/17975#discussion_r2715756682


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java:
##########
@@ -605,4 +607,67 @@ void mockDescribeTopicConfigs(MockedStatic<AdminClient> 
staticMock, Map kafkaPar
     
when(mock.describeConfigs(Collections.singleton(resource))).thenReturn(mockResult);
     when(mockResult.all()).thenReturn(future);
   }
+
+  @Test
+  public void testKafkaDelayCountMetricEmittedWithLag() {
+    testUtils.createTopic(testTopicName, 1);
+    String[] messages = new String[1000];
+    for (int i = 0; i < 1000; i++) {
+      messages[i] = String.format("{\"id\":\"%d\"}", i);
+    }
+    testUtils.sendMessages(testTopicName, messages);
+
+    HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class);
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING));
+
+    // Read first 250 messages, then simulate lag
+    String lastCheckpointString = testTopicName + ",0:250";
+    kafkaOffsetGen.getNextOffsetRanges(
+        Option.of(new StreamerCheckpointV2(lastCheckpointString)), 500, 
mockMetrics);
+
+    // Verify metric was called with lag count of 750 (1000 - 250)
+    verify(mockMetrics, 
times(1)).updateStreamerSourceDelayCount("kafkaDelayCount", 750L);
+  }
+
+  @Test
+  public void testKafkaDelayCountMetricEmittedWithoutCheckpoint() {
+    testUtils.createTopic(testTopicName, 1);
+    String[] messages = new String[1000];
+    for (int i = 0; i < 1000; i++) {
+      messages[i] = String.format("{\"id\":\"%d\"}", i);
+    }
+    testUtils.sendMessages(testTopicName, messages);
+
+    HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class);
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING));
+
+    // First run without checkpoint
+    kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 500, mockMetrics);
+
+    // Verify metric was called with 0 (no checkpoint = no lag)
+    verify(mockMetrics, 
times(1)).updateStreamerSourceDelayCount("kafkaDelayCount", 0L);
+  }
+
+  @Test
+  public void testKafkaDelayCountMetricEmittedWithMultiplePartitions() {
+    testUtils.createTopic(testTopicName, 2);
+    String[] messages = new String[1000];
+    for (int i = 0; i < 1000; i++) {
+      messages[i] = String.format("{\"id\":\"%d\"}", i);
+    }
+    testUtils.sendMessages(testTopicName, messages);
+
+    HoodieIngestionMetrics mockMetrics = mock(HoodieIngestionMetrics.class);
+    KafkaOffsetGen kafkaOffsetGen = new 
KafkaOffsetGen(getConsumerConfigs("earliest", KAFKA_CHECKPOINT_TYPE_STRING));
+
+    // Checkpoint with some consumed messages, creating lag
+    String lastCheckpointString = testTopicName + ",0:250,1:249";
+    kafkaOffsetGen.getNextOffsetRanges(
+        Option.of(new StreamerCheckpointV2(lastCheckpointString)), 300, 
mockMetrics);
+
+    // Verify metric was called with a lag count > 0 (exact value depends on 
partition distribution)
+    ArgumentCaptor<Long> delayCaptor = ArgumentCaptor.forClass(Long.class);
+    verify(mockMetrics, 
times(1)).updateStreamerSourceDelayCount(eq("kafkaDelayCount"), 
delayCaptor.capture());
+    assertTrue(delayCaptor.getValue() > 0, "Delay count should be greater than 
0 when there is lag");

Review Comment:
   We can add a validation for exact delay, it would help in validation delay 
computation is correct for multiple partitions as well.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java:
##########
@@ -313,10 +313,15 @@ public OffsetRange[] 
getNextOffsetRanges(Option<Checkpoint> lastCheckpoint, long
       } else {
         lastCheckpointStr = lastCheckpoint.isPresent() ? 
Option.of(lastCheckpoint.get().getCheckpointKey()) : Option.empty();
       }
+
+      long kafkaDelayCount = 0L;
+      if (lastCheckpointStr.isPresent() && !lastCheckpointStr.get().isEmpty()) 
{
+        kafkaDelayCount = delayOffsetCalculation(lastCheckpointStr, 
topicPartitions, consumer);
+      }
+

Review Comment:
   NIT: We can move the code block where metrics are emitted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to