This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7675dc43d Add completion watermark to State (#3659)
7675dc43d is described below

commit 7675dc43dd7128bb2daedb8c6b826e8fcc9104c7
Author: Limian (Raymond) Zhang <[email protected]>
AuthorDate: Thu Mar 16 17:18:19 2023 -0700

    Add completion watermark to State (#3659)
    
    * 1. Add completioness to state
    
    * 2. Add additioal unit test checks
    
    * 3. revert local gradle changes
    
    * Refactor how we store watermark in State
    
    * Refactor param name
    
    * Refactor
---
 .../iceberg/writer/IcebergMetadataWriter.java      | 22 +++++++++++++++-------
 .../writer/IcebergMetadataWriterConfigKeys.java    |  1 +
 .../iceberg/writer/IcebergMetadataWriterTest.java  | 12 +++++++-----
 3 files changed, 23 insertions(+), 12 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index e643b1dbb..9c2c4fdf1 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -31,6 +31,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
@@ -893,12 +894,13 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
    */
   private void checkAndUpdateCompletenessWatermark(TableMetadata 
tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
       Map<String, String> props) {
+    String tableName = tableMetadata.table.get().name();
     if (topic == null) {
       log.error(String.format("Not performing audit check. %s is null. Please 
set as table property of %s",
-          TOPIC_NAME_KEY, tableMetadata.table.get().name()));
+          TOPIC_NAME_KEY, tableName));
     }
     long newCompletenessWatermark =
-        computeCompletenessWatermark(topic, timestamps, 
tableMetadata.completionWatermark);
+        computeCompletenessWatermark(tableName, topic, timestamps, 
tableMetadata.completionWatermark);
     if (newCompletenessWatermark > tableMetadata.completionWatermark) {
       log.info(String.format("Updating %s for %s to %s", 
COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
           newCompletenessWatermark));
@@ -922,13 +924,14 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
    *  break
    * Using a {@link TimeIterator} that operates over a range of time in 1 unit
    * given the start, end and granularity
-   * @param table
+   * @param catalogDbTableName
+   * @param topicName
    * @param timestamps a sorted set of timestamps in decreasing order
    * @param previousWatermark previous completion watermark for the table
    * @return updated completion watermark
    */
-  private long computeCompletenessWatermark(String table, 
SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
-    log.info(String.format("Compute completion watermark for %s and timestamps 
%s with previous watermark %s", table, timestamps, previousWatermark));
+  private long computeCompletenessWatermark(String catalogDbTableName, String 
topicName, SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
+    log.info(String.format("Compute completion watermark for %s and timestamps 
%s with previous watermark %s", topicName, timestamps, previousWatermark));
     long completionWatermark = previousWatermark;
     ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
     try {
@@ -947,9 +950,14 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         if (timestampDT.isAfter(prevWatermarkDT)
             && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) 
> 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if (auditCountVerifier.get().isComplete(table,
-              TimeIterator.dec(timestampDT, granularity, 
1).toInstant().toEpochMilli(), timestampMillis)) {
+          ZonedDateTime auditCountCheckLowerBoundDT = 
TimeIterator.dec(timestampDT, granularity, 1);
+          if (auditCountVerifier.get().isComplete(topicName,
+                  auditCountCheckLowerBoundDT.toInstant().toEpochMilli(), 
timestampMillis)) {
             completionWatermark = timestampMillis;
+            // Also persist the watermark into State object to share this with 
other MetadataWriters
+            // we enforce ourselves to always use lower-cased table name here
+            String catalogDbTableNameLowerCased = 
catalogDbTableName.toLowerCase(Locale.ROOT);
+            
this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE, 
catalogDbTableNameLowerCased), completionWatermark);
             break;
           }
         } else {
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
index c846270d7..cd51ca3c6 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
@@ -40,6 +40,7 @@ public class IcebergMetadataWriterConfigKeys {
   public static final boolean DEFAULT_ICEBERG_NEW_PARTITION_ENABLED = false;
   public static final String ICEBERG_NEW_PARTITION_WHITELIST = 
"iceberg.new.partition.whitelist";
   public static final String ICEBERG_NEW_PARTITION_BLACKLIST = 
"iceberg.new.partition.blacklist";
+  public static final String STATE_COMPLETION_WATERMARK_KEY_OF_TABLE = 
"completion.watermark.%s";
 
 
 }
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index f9b3b327c..b6174ec7d 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -19,10 +19,7 @@ package org.apache.gobblin.iceberg.writer;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.avro.SchemaBuilder;
@@ -425,13 +422,16 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     // Test when completeness watermark = -1 bootstrap case
     KafkaAuditCountVerifier verifier = 
Mockito.mock(TestAuditCountVerifier.class);
     Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis - 
TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
-    ((IcebergMetadataWriter) 
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
+    IcebergMetadataWriter imw = (IcebergMetadataWriter) 
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next();
+    imw.setAuditCountVerifier(verifier);
     gobblinMCEWriterWithCompletness.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     //completeness watermark = "2020-09-16-10"
     Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), 
"testIcebergTable");
     
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), 
"America/Los_Angeles");
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(timestampMillis));
+    // 1631811600000L correspond to 2020-09-16-10 in PT
+    
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
 table.name().toLowerCase(Locale.ROOT))), 1631811600000L);
 
     Iterator<org.apache.iceberg.DataFile> dfl = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
hourlyFile.getAbsolutePath())).collect().iterator();
     Assert.assertTrue(dfl.hasNext());
@@ -480,6 +480,8 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     gobblinMCEWriterWithCompletness.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(timestampMillis1));
+    // watermark 1631815200000L correspond to 2021-09-16-11 in PT
+    
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
 table.name().toLowerCase(Locale.ROOT))), 1631815200000L);
 
     dfl = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
hourlyFile2.getAbsolutePath())).collect().iterator();
     Assert.assertTrue(dfl.hasNext());

Reply via email to