xushiyan commented on code in PR #18130:
URL: https://github.com/apache/hudi/pull/18130#discussion_r2780059004


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java:
##########
@@ -108,6 +112,7 @@ private void 
processDiscoveredSplits(HoodieContinuousSplitBatch result, Throwabl
 
     if (!result.getSplits().isEmpty()) {
       splitProvider.onDiscoveredSplits(result.getSplits());
+      
readerMetrics.setIssuedInstant(position.get().getIssuedInstant().orElse(""));

Review Comment:
   ```suggestion
         
position.get().getIssuedInstant().ifPresent(readerMetrics::setIssuedInstant);
   ```
   
   only emit it when present? don't want to send empty instant to make metrics 
confusing



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java:
##########
@@ -53,12 +55,14 @@ public HoodieContinuousSplitEnumerator(
       HoodieSplitProvider splitProvider,
       HoodieContinuousSplitDiscover splitDiscover,
       HoodieScanContext scanContext,
-      Option<HoodieSplitEnumeratorState> enumStateOpt) {
+      Option<HoodieSplitEnumeratorState> enumStateOpt,
+      FlinkStreamReadMetrics readerMetrics) {
     super(enumeratorContext, splitProvider);
     this.enumeratorContext = enumeratorContext;
     this.splitProvider = splitProvider;
     this.splitDiscover = splitDiscover;
     this.scanContext = scanContext;
+    this.readerMetrics = readerMetrics;

Review Comment:
   ```suggestion
       this. enumeratorMetrics = enumeratorMetrics;
   ```
   
   more accurate name



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java:
##########
@@ -127,7 +128,9 @@ public 
SimpleVersionedSerializer<HoodieSplitEnumeratorState> getEnumeratorCheckp
 
   @Override
   public SourceReader<T, HoodieSourceSplit> createReader(SourceReaderContext 
readerContext) throws Exception {
-    return new HoodieSourceReader<T>(recordEmitter, scanContext.getConf(), 
readerContext, readerFunction, splitComparator);
+    FlinkStreamReadMetrics readMetrics = new 
FlinkStreamReadMetrics(readerContext.metricGroup());
+    readMetrics.registerMetrics();

Review Comment:
     public void registerMetrics() {
       metricGroup.gauge("issuedInstantDelay", () -> issuedInstantDelay);
       metricGroup.gauge("issuedInstant", () -> issuedInstant);
       metricGroup.gauge("splitLatestCommit", () -> splitLatestCommit);
       metricGroup.gauge("splitLatestCommitDelay", () -> 
splitLatestCommitDelay);
     }
   
   
   registerMetrics have 4 registered. but only saw setIssuedInstant() and 
setSplitLatestCommit() , how to emit the other 2?



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java:
##########
@@ -56,13 +56,15 @@ public class HoodieSourceSplitReader<T> implements 
SplitReader<HoodieRecordWithP
   private HoodieSourceSplit currentSplit;
   private String currentSplitId;
   private CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>> 
currentReader;
+  private FlinkStreamReadMetrics readerMetrics;

Review Comment:
   ```suggestion
     private final FlinkStreamReadMetrics readerMetrics;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to