This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new de82f8f40 Add log line for committing/retrieving watermarks in 
streaming (#3578)
de82f8f40 is described below

commit de82f8f4085c0358f16594147f56d446ba393b28
Author: Jack Moseley <[email protected]>
AuthorDate: Fri Oct 14 11:58:52 2022 -0700

    Add log line for committing/retrieving watermarks in streaming (#3578)
---
 .../org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java    | 1 +
 .../source/extractor/extract/kafka/KafkaStreamingExtractor.java        | 3 +++
 2 files changed, 4 insertions(+)

diff --git 
a/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
 
b/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
index 2a32f4266..deb93ea1f 100644
--- 
a/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
+++ 
b/gobblin-metastore/src/main/java/org/apache/gobblin/runtime/StateStoreBasedWatermarkStorage.java
@@ -115,6 +115,7 @@ public class StateStoreBasedWatermarkStorage implements 
WatermarkStorage {
     for (CheckpointableWatermark watermark: watermarks) {
       String tableName = watermark.getSource();
       _stateStore.put(_storeName, tableName, new 
CheckpointableWatermarkState(watermark, GSON));
+      log.info("Committed watermark {} for table {}", 
watermark.getWatermark().toString(), tableName);
     }
   }
 
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index b2045acbc..87658e09c 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -288,6 +288,9 @@ public class KafkaStreamingExtractor<S> extends 
FlushingExtractor<S, DecodeableK
         longWatermarkMap.put(topicPartition, new LongWatermark(0L));
       }
     }
+    for (Map.Entry<KafkaPartition, LongWatermark> entry : 
longWatermarkMap.entrySet()) {
+      log.info("Retrieved watermark {} for partition {}", 
entry.getValue().toString(), entry.getKey().toString());
+    }
     return longWatermarkMap;
   }
 

Reply via email to