This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a92e1e  [GOBBLIN-1547] Set completion watermark to end of hour 
instead of beg… (#3398)
4a92e1e is described below

commit 4a92e1e4e848310a5d9e76725a5b4717707e9f42
Author: vbohra <[email protected]>
AuthorDate: Fri Sep 17 15:47:20 2021 -0700

    [GOBBLIN-1547] Set completion watermark to end of hour instead of beg… 
(#3398)
    
    * [GOBBLIN-1547] Set completion watermark to end of hour instead of 
beginning
    
    * Fixed code style
---
 .../iceberg/writer/IcebergMetadataWriter.java      | 16 ++++++----
 .../iceberg/writer/IcebergMetadataWriterTest.java  | 36 +++++++++++-----------
 2 files changed, 28 insertions(+), 24 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index f67e614..28051a8 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -729,9 +729,10 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
   private int isLate(String datepartition, long previousWatermark) {
     ZonedDateTime partitionDateTime = ZonedDateTime.parse(datepartition, 
HOURLY_DATEPARTITION_FORMAT);
     long partitionEpochTime = partitionDateTime.toInstant().toEpochMilli();
-    if(partitionEpochTime > previousWatermark) {
+    if(partitionEpochTime >= previousWatermark) {
       return 0;
-    } else if(partitionEpochTime <= previousWatermark && 
partitionDateTime.toLocalDate().equals(getDateFromEpochMillis(previousWatermark)))
 {
+    } else if(partitionEpochTime < previousWatermark
+        && 
partitionDateTime.toLocalDate().equals(getDateFromEpochMillis(previousWatermark)))
 {
       return 1;
     } else {
       return 2;
@@ -861,11 +862,13 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
   }
 
   /**
+   * NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit 
counts match
+   * for that window (aka its is set to the beginning of next window)
    * For each timestamp in sorted collection of timestamps in descending order
    * if timestamp is greater than previousWatermark
-   * and hour(now) > hour(prevWatermark) + 1
+   * and hour(now) > hour(prevWatermark)
    *    check audit counts for completeness between
-   *    a source and reference tier for [timestamp, timstamp + 1 unit of 
granularity]
+   *    a source and reference tier for [timestamp -1 , timstamp unit of 
granularity]
    *    If the audit count matches update the watermark to the timestamp and 
break
    *    else continue
    * else
@@ -895,9 +898,10 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
       while (iterator.hasNext()) {
         ZonedDateTime timestampDT = iterator.next();
         if (timestampDT.isAfter(prevWatermarkDT)
-            && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) 
> 1) {
+            && TimeIterator.durationBetween(prevWatermarkDT, now, granularity) 
> 0) {
           long timestampMillis = timestampDT.toInstant().toEpochMilli();
-          if(auditCountVerifier.get().isComplete(table, timestampMillis, 
TimeIterator.inc(timestampDT, granularity, 1).toInstant().toEpochMilli())) {
+          if (auditCountVerifier.get().isComplete(table,
+              TimeIterator.dec(timestampDT, granularity, 
1).toInstant().toEpochMilli(), timestampMillis)) {
             completionWatermark = timestampMillis;
             break;
           }
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index c234675..8aba5e7 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -330,8 +330,8 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     // Creating a copy of gmce with static type in GenericRecord to work with 
writeEnvelop method
     // without risking running into type cast runtime error.
     gmce.setOperationType(OperationType.add_files);
-    File hourlyFile = new File(tmpDir, 
"data/tracking/testIcebergTable/hourly/2020/03/17/10/data.avro");
-    long timestampMillis = 1584464400000L;
+    File hourlyFile = new File(tmpDir, 
"data/tracking/testIcebergTable/hourly/2021/09/16/10/data.avro");
+    long timestampMillis = 1631811600000L;
     Files.createParentDirs(hourlyFile);
     writeRecord(hourlyFile);
     gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
@@ -351,13 +351,13 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Assert.assertTrue(table.spec().fields().size() == 2);
     Assert.assertEquals(table.spec().fields().get(1).name(), "late");
 
-    // Test when completeness watermark < "2020-03-17-10"
+    // Test when completeness watermark = -1 bootstrap case
     KafkaAuditCountVerifier verifier = 
Mockito.mock(TestAuditCountVerifier.class);
-    Mockito.when(verifier.isComplete("testTopic", timestampMillis, 
timestampMillis + TimeUnit.HOURS.toMillis(1))).thenReturn(true);
+    Mockito.when(verifier.isComplete("testTopic", timestampMillis - 
TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
     ((IcebergMetadataWriter) 
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
     gobblinMCEWriterWithCompletness.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
-    //completeness watermark = "2020-03-17-10"
+    //completeness watermark = "2020-09-16-10"
     Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
     
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY), 
"America/Los_Angeles");
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(timestampMillis));
@@ -365,8 +365,8 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     Iterator<org.apache.iceberg.DataFile> dfl = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
hourlyFile.getAbsolutePath())).collect().iterator();
     Assert.assertTrue(dfl.hasNext());
 
-    // Test when completeness watermark is still "2020-03-17-10" but have a 
late file for "2020-03-17-10"
-    File hourlyFile1 = new File(tmpDir, 
"data/tracking/testIcebergTable/hourly/2020/03/17/10/data1.avro");
+    // Test when completeness watermark is still "2021-09-16-10" but have a 
late file for "2021-09-16-09"
+    File hourlyFile1 = new File(tmpDir, 
"data/tracking/testIcebergTable/hourly/2021/09/16/09/data1.avro");
     Files.createParentDirs(hourlyFile1);
     writeRecord(hourlyFile1);
     gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
@@ -386,10 +386,10 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
 
     dfl = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
hourlyFile1.getAbsolutePath())).collect().iterator();
     Assert.assertTrue(dfl.hasNext());
-    Assert.assertTrue(dfl.next().partition().get(1, Integer.class) == 1);
+    Assert.assertEquals((int) dfl.next().partition().get(1, Integer.class), 1);
 
-    // Test when completeness watermark will advance to "2020-03-17-11"
-    File hourlyFile2 = new File(tmpDir, 
"data/tracking/testIcebergTable/hourly/2020/03/17/11/data.avro");
+    // Test when completeness watermark will advance to "2021-09-16-11"
+    File hourlyFile2 = new File(tmpDir, 
"data/tracking/testIcebergTable/hourly/2021/09/16/11/data.avro");
     long timestampMillis1 = timestampMillis + TimeUnit.HOURS.toMillis(1);
     Files.createParentDirs(hourlyFile2);
     writeRecord(hourlyFile2);
@@ -405,7 +405,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
             new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
             new LongWatermark(60L))));
 
-    Mockito.when(verifier.isComplete("testTopic", timestampMillis1, 
timestampMillis1 + TimeUnit.HOURS.toMillis(1))).thenReturn(true);
+    Mockito.when(verifier.isComplete("testTopic", timestampMillis1 - 
TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
     gobblinMCEWriterWithCompletness.flush();
     table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
     Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY), 
String.valueOf(timestampMillis1));
@@ -440,13 +440,13 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
         partitionValue = "2020-03-17-08";
       } else if (path.toString().contains("hourly/2020/03/17/09")) {
         partitionValue = "2020-03-17-09";
-      } else if (path.toString().contains("hourly/2020/03/17/10")) {
-        partitionValue = "2020-03-17-10";
-      }  else if (path.toString().contains("hourly/2020/03/17/11")) {
-        partitionValue = "2020-03-17-11";
-      } else if (path.toString().contains("hourly/2020/03/17/12")) {
-        partitionValue = "2020-03-17-12";
-      } else if (path.toString().contains("daily/2020/03/17")) {
+      } else if (path.toString().contains("hourly/2021/09/16/09")) {
+        partitionValue = "2021-09-16-09";
+      } else if (path.toString().contains("hourly/2021/09/16/10")) {
+        partitionValue = "2021-09-16-10";
+      } else if (path.toString().contains("hourly/2021/09/16/11")) {
+        partitionValue = "2021-09-16-11";
+      }  else if (path.toString().contains("daily/2020/03/17")) {
         partitionValue = "2020-03-17-00";
       }
       return Optional.of(new 
HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue))

Reply via email to