This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 1548350bd [GOBBLIN-1833]Emit Completeness watermark information in 
snapshotCommitEvent (#3696)
1548350bd is described below

commit 1548350bdb3dedc10c66c4951684c75dc230c304
Author: Zihan Li <[email protected]>
AuthorDate: Thu May 18 17:34:02 2023 -0700

    [GOBBLIN-1833]Emit Completeness watermark information in 
snapshotCommitEvent (#3696)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-1833]Emit Completeness watermark information in 
snapshotCommitEvent
    
    * address comments
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java     | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 1e88a83c7..3135a1243 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -788,7 +788,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
    * @return kafka topic name for this table
    */
   protected String getTopicName(TableIdentifier tid, TableMetadata 
tableMetadata) {
-    if (tableMetadata.dataOffsetRange.isPresent()) {
+    if (tableMetadata.dataOffsetRange.isPresent() && 
tableMetadata.dataOffsetRange.get().size() != 0) {
       String topicPartitionString = 
tableMetadata.dataOffsetRange.get().keySet().iterator().next();
       //In case the topic name is not the table name or the topic name 
contains '-'
       return topicPartitionString.substring(0, 
topicPartitionString.lastIndexOf('-'));
@@ -1011,6 +1011,9 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         gobblinTrackingEvent.addMetadata(entry.getKey(), entry.getValue());
       }
     }
+    if (tableMetadata.completenessEnabled) {
+      gobblinTrackingEvent.addMetadata(COMPLETION_WATERMARK_KEY, 
Long.toString(tableMetadata.completionWatermark));
+    }
     eventSubmitter.submit(gobblinTrackingEvent);
   }
 

Reply via email to