nsivabalan commented on code in PR #17929:
URL: https://github.com/apache/hudi/pull/17929#discussion_r2760794780
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java:
##########
@@ -143,6 +147,89 @@ public void testCreateSource() {
assertEquals(Source.SourceType.ROW, incrSource.getSourceType());
}
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testHoodieIncrSourceMetricsPublishing(HoodieTableType tableType)
throws IOException {
+ this.tableType = tableType;
+ metaClient = getHoodieMetaClient(storageConf(), basePath());
+ HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10,
12).build())
+
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(3).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ .build();
+
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+ List<WriteResult> inserts = new ArrayList<>();
+ // Write 5 commits
+ for (int i = 0; i < 5; i++) {
+ inserts.add(writeRecords(writeClient, tableType, INSERT, null, 100));
+ }
+
+ // Reset mock to clear any previous invocations
+ reset(metrics);
+
+ // Test 1: Read everything from the beginning (no checkpoint)
+ // Should process all 5 commits, with 0 unprocessed commits
+ readAndAssertWithLatestTableVersion(
+ IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+ Option.empty(),
+ 500,
+ inserts.get(4).getInstant());
+ // Verify metrics were published
+ verify(metrics, atLeastOnce()).updateHoodieIncrSourceMetrics(anyLong(),
anyLong());
+
+ // Reset mock for next test
+ reset(metrics);
+
+ // Test 2: Read from checkpoint at instant 2
+ // Should process commits 3 and 4 (2 commits), with 0 unprocessed
+ readAndAssertWithLatestTableVersion(
+ IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+ Option.of(inserts.get(2).getInstant()),
+ 200,
+ inserts.get(4).getInstant());
+ // Verify metrics were called
+ verify(metrics, atLeastOnce()).updateHoodieIncrSourceMetrics(anyLong(),
anyLong());
+
+ // Reset mock for next test
+ reset(metrics);
+
+ // Write 2 more commits
+ inserts.add(writeRecords(writeClient, tableType, INSERT, null, 100));
+ inserts.add(writeRecords(writeClient, tableType, INSERT, null, 100));
+
+ // Test 3: Read from checkpoint at instant 2, but there are more commits
available
+ // Should process commits 3 and 4 (2 commits), with 2 unprocessed
(instant 5 and 6)
+ TypedProperties props = new TypedProperties();
+ props.setProperty("hoodie.streamer.source.hoodieincr.num_instants", "2");
+ readAndAssertWithLatestTableVersion(
+ IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+ Option.of(inserts.get(2).getInstant()),
+ 200,
+ inserts.get(4).getInstant(),
+ Option.empty(),
+ props,
+ HoodieTableVersion.EIGHT);
+ // Verify metrics were called
+ verify(metrics, atLeastOnce()).updateHoodieIncrSourceMetrics(anyLong(),
anyLong());
+
+ // Reset mock for next test
+ reset(metrics);
+
+ // Test 4: Read with READ_LATEST strategy (snapshot query)
+ readAndAssertWithLatestTableVersion(
+ IncrSourceHelper.MissingCheckpointStrategy.READ_LATEST,
+ Option.empty(),
+ 100,
+ inserts.get(6).getInstant());
+ // Verify metrics were published
+ verify(metrics, atLeastOnce()).updateHoodieIncrSourceMetrics(anyLong(),
anyLong());
Review Comment:
same here
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/HoodieIncrSource.java:
##########
@@ -288,6 +289,12 @@ private Pair<Option<Dataset<Row>>, Checkpoint>
fetchNextBatchBasedOnCompletionTi
// add filtering so that only interested records are returned.
.filter(String.format("%s IN ('%s')",
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
String.join("','", instantTimeList)));
+
+ // Calculate metrics for snapshot query
+ int numInstantsProcessed = instantTimeList.size();
+ int numUnprocessedInstants = (int) queryContext.getActiveTimeline()
+ .findInstantsAfter(endCompletionTime).countInstants();
+ metricsOption.ifPresent(metrics ->
metrics.updateHoodieIncrSourceMetrics(numInstantsProcessed,
numUnprocessedInstants));
Review Comment:
don't we need to update w/n fetchNextBatchBasedOnRequestedTime as well?
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java:
##########
@@ -143,6 +147,89 @@ public void testCreateSource() {
assertEquals(Source.SourceType.ROW, incrSource.getSourceType());
}
+ @ParameterizedTest
+ @EnumSource(HoodieTableType.class)
+ public void testHoodieIncrSourceMetricsPublishing(HoodieTableType tableType)
throws IOException {
+ this.tableType = tableType;
+ metaClient = getHoodieMetaClient(storageConf(), basePath());
+ HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
+
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10,
12).build())
+
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build())
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+ .withInlineCompaction(false)
+ .withMaxNumDeltaCommitsBeforeCompaction(3).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+ .build();
+
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+ List<WriteResult> inserts = new ArrayList<>();
+ // Write 5 commits
+ for (int i = 0; i < 5; i++) {
+ inserts.add(writeRecords(writeClient, tableType, INSERT, null, 100));
+ }
+
+ // Reset mock to clear any previous invocations
+ reset(metrics);
+
+ // Test 1: Read everything from the beginning (no checkpoint)
+ // Should process all 5 commits, with 0 unprocessed commits
+ readAndAssertWithLatestTableVersion(
+ IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
+ Option.empty(),
+ 500,
+ inserts.get(4).getInstant());
+ // Verify metrics were published
+ verify(metrics, atLeastOnce()).updateHoodieIncrSourceMetrics(anyLong(),
anyLong());
Review Comment:
can we validate the values as well in the metrics.
something like
```
MetricRegistry registry = metrics.getMetrics().getRegistry();
assertEquals(1, registry.getGauges().size());
assertEquals(".deltastreamer.errorTableCommitDuration",
registry.getGauges().firstKey());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]