This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5cbd1d83ba [HUDI-4873] Report number of messages to be processed via
metrics (#6271)
5cbd1d83ba is described below
commit 5cbd1d83bacda48a07808c10638375b0263c2bfd
Author: Volodymyr Burenin <[email protected]>
AuthorDate: Sat Sep 17 17:59:25 2022 -0500
[HUDI-4873] Report number of messages to be processed via metrics (#6271)
Co-authored-by: Volodymyr Burenin <[email protected]>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java | 6 ++++++
.../main/java/org/apache/hudi/utilities/sources/KafkaSource.java | 2 ++
2 files changed, 8 insertions(+)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
index d361179a1d..2475e92f96 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java
@@ -101,6 +101,12 @@ public class HoodieDeltaStreamerMetrics implements
Serializable {
}
}
+ public void updateDeltaStreamerKafkaMessageInCount(long totalNewMsgCount) {
+ if (config.isMetricsOn()) {
+ Metrics.registerGauge(getMetricsName("deltastreamer",
"kafkaMessageInCount"), totalNewMsgCount);
+ }
+ }
+
public long getDurationInMs(long ctxDuration) {
return ctxDuration / 1000000;
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
index 6f2377fc7c..5561356cab 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/KafkaSource.java
@@ -57,8 +57,10 @@ abstract class KafkaSource<T> extends Source<JavaRDD<T>> {
long totalNewMsgs =
KafkaOffsetGen.CheckpointUtils.totalNewMessages(offsetRanges);
LOG.info("About to read " + totalNewMsgs + " from Kafka for topic :" +
offsetGen.getTopicName());
if (totalNewMsgs <= 0) {
+ metrics.updateDeltaStreamerKafkaMessageInCount(0);
return new InputBatch<>(Option.empty(),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
}
+ metrics.updateDeltaStreamerKafkaMessageInCount(totalNewMsgs);
JavaRDD<T> newDataRDD = toRDD(offsetRanges);
return new InputBatch<>(Option.of(newDataRDD),
KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges));
} catch (org.apache.kafka.common.errors.TimeoutException e) {