xushiyan commented on code in PR #18130:
URL: https://github.com/apache/hudi/pull/18130#discussion_r2780059004
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java:
##########
@@ -108,6 +112,7 @@ private void
processDiscoveredSplits(HoodieContinuousSplitBatch result, Throwabl
if (!result.getSplits().isEmpty()) {
splitProvider.onDiscoveredSplits(result.getSplits());
+
readerMetrics.setIssuedInstant(position.get().getIssuedInstant().orElse(""));
Review Comment:
```suggestion
position.get().getIssuedInstant().ifPresent(readerMetrics::setIssuedInstant);
```
only emit it when present? don't want to send empty instant to make metrics
confusing
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/enumerator/HoodieContinuousSplitEnumerator.java:
##########
@@ -53,12 +55,14 @@ public HoodieContinuousSplitEnumerator(
HoodieSplitProvider splitProvider,
HoodieContinuousSplitDiscover splitDiscover,
HoodieScanContext scanContext,
- Option<HoodieSplitEnumeratorState> enumStateOpt) {
+ Option<HoodieSplitEnumeratorState> enumStateOpt,
+ FlinkStreamReadMetrics readerMetrics) {
super(enumeratorContext, splitProvider);
this.enumeratorContext = enumeratorContext;
this.splitProvider = splitProvider;
this.splitDiscover = splitDiscover;
this.scanContext = scanContext;
+ this.readerMetrics = readerMetrics;
Review Comment:
```suggestion
this. enumeratorMetrics = enumeratorMetrics;
```
more accurate name
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/HoodieSource.java:
##########
@@ -127,7 +128,9 @@ public
SimpleVersionedSerializer<HoodieSplitEnumeratorState> getEnumeratorCheckp
@Override
public SourceReader<T, HoodieSourceSplit> createReader(SourceReaderContext
readerContext) throws Exception {
- return new HoodieSourceReader<T>(recordEmitter, scanContext.getConf(),
readerContext, readerFunction, splitComparator);
+ FlinkStreamReadMetrics readMetrics = new
FlinkStreamReadMetrics(readerContext.metricGroup());
+ readMetrics.registerMetrics();
Review Comment:
public void registerMetrics() {
metricGroup.gauge("issuedInstantDelay", () -> issuedInstantDelay);
metricGroup.gauge("issuedInstant", () -> issuedInstant);
metricGroup.gauge("splitLatestCommit", () -> splitLatestCommit);
metricGroup.gauge("splitLatestCommitDelay", () ->
splitLatestCommitDelay);
}
registerMetrics have 4 registered. but only saw setIssuedInstant() and
setSplitLatestCommit() , how to emit the other 2?
##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/reader/HoodieSourceSplitReader.java:
##########
@@ -56,13 +56,15 @@ public class HoodieSourceSplitReader<T> implements
SplitReader<HoodieRecordWithP
private HoodieSourceSplit currentSplit;
private String currentSplitId;
private CloseableIterator<RecordsWithSplitIds<HoodieRecordWithPosition<T>>>
currentReader;
+ private FlinkStreamReadMetrics readerMetrics;
Review Comment:
```suggestion
private final FlinkStreamReadMetrics readerMetrics;
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]