This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 36cdf2bb0 [GOBBLIN-2040] Abstract comparable watermark (#3919)
36cdf2bb0 is described below

commit 36cdf2bb0550c9cf59dabce9253540f50c18f0e0
Author: Zihan Li <[email protected]>
AuthorDate: Fri Apr 19 09:06:09 2024 -0700

    [GOBBLIN-2040] Abstract comparable watermark (#3919)
    
    * address comments
    
    * use connectionmanager when httpclient is not cloesable
    
    * [GOBBLIN-2040]Abstract comparable watermark
    
    * address comments
    
    ---------
    
    Co-authored-by: Zihan Li <[email protected]>
---
 .../source/extractor/ComparableWatermark.java      |  4 +++-
 .../source/extractor/extract/LongWatermark.java    | 15 +++++++++++---
 .../writer/FineGrainedWatermarkTrackerTest.java    |  6 +++---
 .../gobblin/writer/WatermarkTrackerTest.java       |  6 +++---
 .../management/copy/watermark/StringWatermark.java |  6 +++---
 .../watermarker/PartitionLevelWatermarkerTest.java |  8 ++++----
 .../source/LoopingDatasetFinderSourceTest.java     | 24 +++++++++++-----------
 .../runtime/StateStoreWatermarkStorageTest.java    |  2 +-
 8 files changed, 41 insertions(+), 30 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/ComparableWatermark.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/ComparableWatermark.java
index 361f315db..512197607 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/ComparableWatermark.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/ComparableWatermark.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.source.extractor;
 /**
  * {@link Watermark} that is also {@link Comparable}.
  */
-public interface ComparableWatermark extends Watermark, 
Comparable<ComparableWatermark>{
+public interface ComparableWatermark<V extends Comparable<V>> extends 
Watermark, Comparable<ComparableWatermark>{
+
+  V getValue();
 
 }
diff --git 
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/LongWatermark.java
 
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/LongWatermark.java
index bc8296569..fbc60b9cd 100644
--- 
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/LongWatermark.java
+++ 
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/LongWatermark.java
@@ -29,20 +29,29 @@ import 
org.apache.gobblin.source.extractor.ComparableWatermark;
 import org.apache.gobblin.source.extractor.Watermark;
 
 import lombok.EqualsAndHashCode;
-import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 
+
+/**
+ * Long based {@link ComparableWatermark} implementation.
+ */
 @ToString
 @EqualsAndHashCode
-public class LongWatermark implements ComparableWatermark {
+public class LongWatermark implements ComparableWatermark<Long> {
 
   private static final Gson GSON = new Gson();
 
-  @Getter
   @Setter
   private long value;
 
+  @Override
+  // Returns a Long object due to Java generics' requirement for object types.
+  // The underlying variable is maintained as a primitive long to optimize 
performance for mathematical operations.
+  public Long getValue(){
+    return value;
+  }
+
   public LongWatermark(long value) {
     this.value = value;
   }
diff --git 
a/gobblin-core-base/src/test/java/org/apache/gobblin/writer/FineGrainedWatermarkTrackerTest.java
 
b/gobblin-core-base/src/test/java/org/apache/gobblin/writer/FineGrainedWatermarkTrackerTest.java
index a51f7b803..f03df99cb 100644
--- 
a/gobblin-core-base/src/test/java/org/apache/gobblin/writer/FineGrainedWatermarkTrackerTest.java
+++ 
b/gobblin-core-base/src/test/java/org/apache/gobblin/writer/FineGrainedWatermarkTrackerTest.java
@@ -98,7 +98,7 @@ public class FineGrainedWatermarkTrackerTest {
     } else {
       Assert.assertEquals(uncommitted.size(), 1);
       CheckpointableWatermark uncommitable = uncommitted.get("default");
-      Assert.assertEquals(((LongWatermark) 
uncommitable.getWatermark()).getValue(), (long) holes.first());
+      Assert.assertEquals((long)((LongWatermark) 
uncommitable.getWatermark()).getValue(),  (long)holes.first());
     }
 
     Map<String, CheckpointableWatermark> commitables = 
tracker.getCommittableWatermarks();
@@ -109,9 +109,9 @@ public class FineGrainedWatermarkTrackerTest {
       Assert.assertEquals(commitables.size(), 1);
       CheckpointableWatermark commitable = commitables.get("default");
       if (holes.isEmpty()) {
-        Assert.assertEquals(((LongWatermark) 
commitable.getWatermark()).getValue(), maxWatermark);
+        Assert.assertEquals((long)((LongWatermark) 
commitable.getWatermark()).getValue(), maxWatermark);
       } else {
-        Assert.assertEquals(((LongWatermark) 
commitable.getWatermark()).getValue(), holes.first() - 1);
+        Assert.assertEquals((long)((LongWatermark) 
commitable.getWatermark()).getValue(), holes.first() - 1);
       }
     }
 
diff --git 
a/gobblin-core-base/src/test/java/org/apache/gobblin/writer/WatermarkTrackerTest.java
 
b/gobblin-core-base/src/test/java/org/apache/gobblin/writer/WatermarkTrackerTest.java
index e94aaa9af..64002c13c 100644
--- 
a/gobblin-core-base/src/test/java/org/apache/gobblin/writer/WatermarkTrackerTest.java
+++ 
b/gobblin-core-base/src/test/java/org/apache/gobblin/writer/WatermarkTrackerTest.java
@@ -42,7 +42,7 @@ public class WatermarkTrackerTest {
     commits(watermarkTracker, "default", 0, 4, 5, 6);
 
     
Assert.assertEquals(watermarkTracker.getCommittableWatermark("default").get().getSource(),
 "default");
-    Assert.assertEquals(((LongWatermark) 
watermarkTracker.getCommittableWatermark("default")
+    Assert.assertEquals((long)((LongWatermark) 
watermarkTracker.getCommittableWatermark("default")
         .get().getWatermark()).getValue(), 6L);
   }
 
@@ -54,10 +54,10 @@ public class WatermarkTrackerTest {
     commits(watermarkTracker, "other", 1, 3, 5, 7);
 
     
Assert.assertEquals(watermarkTracker.getCommittableWatermark("default").get().getSource(),
 "default");
-    Assert.assertEquals(((LongWatermark) 
watermarkTracker.getCommittableWatermark("default")
+    Assert.assertEquals((long)((LongWatermark) 
watermarkTracker.getCommittableWatermark("default")
         .get().getWatermark()).getValue(), 6L);
     
Assert.assertEquals(watermarkTracker.getCommittableWatermark("other").get().getSource(),
 "other");
-    Assert.assertEquals(((LongWatermark) 
watermarkTracker.getCommittableWatermark("other")
+    Assert.assertEquals((long)((LongWatermark) 
watermarkTracker.getCommittableWatermark("other")
         .get().getWatermark()).getValue(), 7L);
 
   }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/watermark/StringWatermark.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/watermark/StringWatermark.java
index 261fd3829..2654c722a 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/watermark/StringWatermark.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/watermark/StringWatermark.java
@@ -20,13 +20,13 @@ package org.apache.gobblin.data.management.copy.watermark;
 import com.google.common.base.Preconditions;
 import com.google.gson.JsonElement;
 
+import lombok.Getter;
 import org.apache.gobblin.source.extractor.ComparableWatermark;
 import org.apache.gobblin.source.extractor.Watermark;
 import org.apache.gobblin.source.extractor.WatermarkSerializerHelper;
 
 import lombok.AllArgsConstructor;
 import lombok.EqualsAndHashCode;
-import lombok.Getter;
 
 
 /**
@@ -34,10 +34,10 @@ import lombok.Getter;
  */
 @AllArgsConstructor
 @EqualsAndHashCode
-public class StringWatermark implements ComparableWatermark {
+public class StringWatermark implements ComparableWatermark<String> {
 
   @Getter
-  String value;
+  private String value;
 
   @Override
   public int compareTo(ComparableWatermark other) {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
index 8a7d37ea7..c818c36ba 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
@@ -96,11 +96,11 @@ public class PartitionLevelWatermarkerTest {
     List<WorkUnit> workunits = Lists.newArrayList();
     watermarker.onGetWorkunitsEnd(workunits);
 
-    
Assert.assertEquals(watermarker.getPreviousHighWatermark(part1).getValue(), 0l);
-    
Assert.assertEquals(watermarker.getPreviousHighWatermark(table).getValue(), 0l);
+    
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(part1).getValue(),
 0l);
+    
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(table).getValue(),
 0l);
 
-    
Assert.assertEquals(watermarker.getPreviousHighWatermark(part2).getValue(), 0l);
-    
Assert.assertEquals(watermarker.getPreviousHighWatermark(table2).getValue(), 
0l);
+    
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(part2).getValue(),
 0l);
+    
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(table2).getValue(),
 0l);
 
     Assert.assertEquals(workunits.size(), 2);
 
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
index 41745f1af..85b85f079 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
@@ -205,17 +205,17 @@ public class LoopingDatasetFinderSourceTest {
     List<LongWatermark> watermarks1 = new ArrayList<>();
     List<Dataset> datasets1 = new ArrayList<>();
     
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset1");
-    
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
     
watermarks1.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
     datasets1.add(dataset1);
 
     
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset2");
-    
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 0);
     
watermarks1.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
     datasets1.add(dataset2);
 
     
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset3");
-    
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
Assert.assertEquals((long)workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
 0);
     
watermarks1.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
     datasets1.add(dataset3);
 
@@ -244,12 +244,12 @@ public class LoopingDatasetFinderSourceTest {
 
     Assert.assertEquals(workUnits.size(), 3);
     
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset4");
-    
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
     
watermarks2.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
     datasets2.add(dataset4);
 
     
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset5");
-    
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 0);
     
watermarks2.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
     datasets2.add(dataset5);
 
@@ -327,17 +327,17 @@ public class LoopingDatasetFinderSourceTest {
 
     Assert.assertEquals(workUnits.size(), 4);
     
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset1");
-    
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
     
watermarks1.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
     datasets1.add(dataset1);
 
     
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset2@p1");
-    
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 0);
     
watermarks1.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
     datasets1.add(new SimpleDatasetForTesting("dataset2@p1"));
 
     
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset2@p2");
-    
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
Assert.assertEquals((long)workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
 0);
     
watermarks1.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
     datasets1.add(new SimpleDatasetForTesting("dataset2@p2"));
 
@@ -367,17 +367,17 @@ public class LoopingDatasetFinderSourceTest {
 
     Assert.assertEquals(workUnits.size(), 4);
     
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset2@p3");
-    
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
     
watermarks2.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
     datasets2.add(new SimpleDatasetForTesting("dataset2@p3"));
 
     
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset3@p1");
-    
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
 0);
     
watermarks2.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
     datasets2.add(new SimpleDatasetForTesting("dataset3@p1"));
 
     
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset3@p2");
-    
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
Assert.assertEquals((long)workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
 0);
     
watermarks2.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
     datasets2.add(new SimpleDatasetForTesting("dataset3@p2"));
 
@@ -404,7 +404,7 @@ public class LoopingDatasetFinderSourceTest {
 
     Assert.assertEquals(workUnits.size(), 2);
     
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
 "dataset3@p3");
-    
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
+    
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
 0);
     
watermarks3.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
     datasets3.add(new SimpleDatasetForTesting("dataset3@p3"));
 
diff --git 
a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/StateStoreWatermarkStorageTest.java
 
b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/StateStoreWatermarkStorageTest.java
index 19167aee1..d22f76b2a 100644
--- 
a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/StateStoreWatermarkStorageTest.java
+++ 
b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/StateStoreWatermarkStorageTest.java
@@ -68,7 +68,7 @@ public class StateStoreWatermarkStorageTest {
         ImmutableList.of("source"));
 
     Assert.assertEquals(watermarkMap.size(), 1);
-    Assert.assertEquals(((LongWatermark) 
watermarkMap.get("source").getWatermark()).getValue(), startTime);
+    Assert.assertEquals((long)((LongWatermark) 
watermarkMap.get("source").getWatermark()).getValue(), startTime);
   }
 
   @AfterClass

Reply via email to