This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new de82f8f40 Add log line for committing/retrieving watermarks in
streaming (#3578)
de82f8f40 is described below
commit de82f8f4085c0358f16594147f56d446ba393b28
Author: Jack Moseley <[email protected]>
AuthorDate: Fri Oct 14 11:58:52 2022 -0700
Add log line for committing/retrieving watermarks in streaming (#3578)
---
.../org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java | 1 +
.../source/extractor/extract/kafka/KafkaStreamingExtractor.java | 3 +++
2 files changed, 4 insertions(+)
diff --git
a/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
b/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
index 2a32f4266..deb93ea1f 100644
---
a/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
+++
b/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
@@ -115,6 +115,7 @@ public class StateStoreBasedWatermarkStorage implements
WatermarkStorage {
for (CheckpointableWatermark watermark: watermarks) {
String tableName = watermark.getSource();
_stateStore.put(_storeName, tableName, new
CheckpointableWatermarkState(watermark, GSON));
+ log.info("Committed watermark {} for table {}",
watermark.getWatermark().toString(), tableName);
}
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index b2045acbc..87658e09c 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -288,6 +288,9 @@ public class KafkaStreamingExtractor<S> extends
FlushingExtractor<S, DecodeableK
longWatermarkMap.put(topicPartition, new LongWatermark(0L));
}
}
+ for (Map.Entry<KafkaPartition, LongWatermark> entry :
longWatermarkMap.entrySet()) {
+ log.info("Retrieved watermark {} for partition {}",
entry.getValue().toString(), entry.getKey().toString());
+ }
return longWatermarkMap;
}