This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 36cdf2bb0 [GOBBLIN-2040] Abstract comparable watermark (#3919)
36cdf2bb0 is described below
commit 36cdf2bb0550c9cf59dabce9253540f50c18f0e0
Author: Zihan Li <[email protected]>
AuthorDate: Fri Apr 19 09:06:09 2024 -0700
[GOBBLIN-2040] Abstract comparable watermark (#3919)
* address comments
* use connectionmanager when httpclient is not cloesable
* [GOBBLIN-2040]Abstract comparable watermark
* address comments
---------
Co-authored-by: Zihan Li <[email protected]>
---
.../source/extractor/ComparableWatermark.java | 4 +++-
.../source/extractor/extract/LongWatermark.java | 15 +++++++++++---
.../writer/FineGrainedWatermarkTrackerTest.java | 6 +++---
.../gobblin/writer/WatermarkTrackerTest.java | 6 +++---
.../management/copy/watermark/StringWatermark.java | 6 +++---
.../watermarker/PartitionLevelWatermarkerTest.java | 8 ++++----
.../source/LoopingDatasetFinderSourceTest.java | 24 +++++++++++-----------
.../runtime/StateStoreWatermarkStorageTest.java | 2 +-
8 files changed, 41 insertions(+), 30 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/ComparableWatermark.java
b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/ComparableWatermark.java
index 361f315db..512197607 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/ComparableWatermark.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/source/extractor/ComparableWatermark.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.source.extractor;
/**
* {@link Watermark} that is also {@link Comparable}.
*/
-public interface ComparableWatermark extends Watermark,
Comparable<ComparableWatermark>{
+public interface ComparableWatermark<V extends Comparable<V>> extends
Watermark, Comparable<ComparableWatermark>{
+
+ V getValue();
}
diff --git
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/LongWatermark.java
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/LongWatermark.java
index bc8296569..fbc60b9cd 100644
---
a/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/LongWatermark.java
+++
b/gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/LongWatermark.java
@@ -29,20 +29,29 @@ import
org.apache.gobblin.source.extractor.ComparableWatermark;
import org.apache.gobblin.source.extractor.Watermark;
import lombok.EqualsAndHashCode;
-import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+
+/**
+ * Long based {@link ComparableWatermark} implementation.
+ */
@ToString
@EqualsAndHashCode
-public class LongWatermark implements ComparableWatermark {
+public class LongWatermark implements ComparableWatermark<Long> {
private static final Gson GSON = new Gson();
- @Getter
@Setter
private long value;
+ @Override
+ // Returns a Long object due to Java generics' requirement for object types.
+ // The underlying variable is maintained as a primitive long to optimize
performance for mathematical operations.
+ public Long getValue(){
+ return value;
+ }
+
public LongWatermark(long value) {
this.value = value;
}
diff --git
a/gobblin-core-base/src/test/java/org/apache/gobblin/writer/FineGrainedWatermarkTrackerTest.java
b/gobblin-core-base/src/test/java/org/apache/gobblin/writer/FineGrainedWatermarkTrackerTest.java
index a51f7b803..f03df99cb 100644
---
a/gobblin-core-base/src/test/java/org/apache/gobblin/writer/FineGrainedWatermarkTrackerTest.java
+++
b/gobblin-core-base/src/test/java/org/apache/gobblin/writer/FineGrainedWatermarkTrackerTest.java
@@ -98,7 +98,7 @@ public class FineGrainedWatermarkTrackerTest {
} else {
Assert.assertEquals(uncommitted.size(), 1);
CheckpointableWatermark uncommitable = uncommitted.get("default");
- Assert.assertEquals(((LongWatermark)
uncommitable.getWatermark()).getValue(), (long) holes.first());
+ Assert.assertEquals((long)((LongWatermark)
uncommitable.getWatermark()).getValue(), (long)holes.first());
}
Map<String, CheckpointableWatermark> commitables =
tracker.getCommittableWatermarks();
@@ -109,9 +109,9 @@ public class FineGrainedWatermarkTrackerTest {
Assert.assertEquals(commitables.size(), 1);
CheckpointableWatermark commitable = commitables.get("default");
if (holes.isEmpty()) {
- Assert.assertEquals(((LongWatermark)
commitable.getWatermark()).getValue(), maxWatermark);
+ Assert.assertEquals((long)((LongWatermark)
commitable.getWatermark()).getValue(), maxWatermark);
} else {
- Assert.assertEquals(((LongWatermark)
commitable.getWatermark()).getValue(), holes.first() - 1);
+ Assert.assertEquals((long)((LongWatermark)
commitable.getWatermark()).getValue(), holes.first() - 1);
}
}
diff --git
a/gobblin-core-base/src/test/java/org/apache/gobblin/writer/WatermarkTrackerTest.java
b/gobblin-core-base/src/test/java/org/apache/gobblin/writer/WatermarkTrackerTest.java
index e94aaa9af..64002c13c 100644
---
a/gobblin-core-base/src/test/java/org/apache/gobblin/writer/WatermarkTrackerTest.java
+++
b/gobblin-core-base/src/test/java/org/apache/gobblin/writer/WatermarkTrackerTest.java
@@ -42,7 +42,7 @@ public class WatermarkTrackerTest {
commits(watermarkTracker, "default", 0, 4, 5, 6);
Assert.assertEquals(watermarkTracker.getCommittableWatermark("default").get().getSource(),
"default");
- Assert.assertEquals(((LongWatermark)
watermarkTracker.getCommittableWatermark("default")
+ Assert.assertEquals((long)((LongWatermark)
watermarkTracker.getCommittableWatermark("default")
.get().getWatermark()).getValue(), 6L);
}
@@ -54,10 +54,10 @@ public class WatermarkTrackerTest {
commits(watermarkTracker, "other", 1, 3, 5, 7);
Assert.assertEquals(watermarkTracker.getCommittableWatermark("default").get().getSource(),
"default");
- Assert.assertEquals(((LongWatermark)
watermarkTracker.getCommittableWatermark("default")
+ Assert.assertEquals((long)((LongWatermark)
watermarkTracker.getCommittableWatermark("default")
.get().getWatermark()).getValue(), 6L);
Assert.assertEquals(watermarkTracker.getCommittableWatermark("other").get().getSource(),
"other");
- Assert.assertEquals(((LongWatermark)
watermarkTracker.getCommittableWatermark("other")
+ Assert.assertEquals((long)((LongWatermark)
watermarkTracker.getCommittableWatermark("other")
.get().getWatermark()).getValue(), 7L);
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/watermark/StringWatermark.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/watermark/StringWatermark.java
index 261fd3829..2654c722a 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/watermark/StringWatermark.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/watermark/StringWatermark.java
@@ -20,13 +20,13 @@ package org.apache.gobblin.data.management.copy.watermark;
import com.google.common.base.Preconditions;
import com.google.gson.JsonElement;
+import lombok.Getter;
import org.apache.gobblin.source.extractor.ComparableWatermark;
import org.apache.gobblin.source.extractor.Watermark;
import org.apache.gobblin.source.extractor.WatermarkSerializerHelper;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
-import lombok.Getter;
/**
@@ -34,10 +34,10 @@ import lombok.Getter;
*/
@AllArgsConstructor
@EqualsAndHashCode
-public class StringWatermark implements ComparableWatermark {
+public class StringWatermark implements ComparableWatermark<String> {
@Getter
- String value;
+ private String value;
@Override
public int compareTo(ComparableWatermark other) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
index 8a7d37ea7..c818c36ba 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java
@@ -96,11 +96,11 @@ public class PartitionLevelWatermarkerTest {
List<WorkUnit> workunits = Lists.newArrayList();
watermarker.onGetWorkunitsEnd(workunits);
-
Assert.assertEquals(watermarker.getPreviousHighWatermark(part1).getValue(), 0l);
-
Assert.assertEquals(watermarker.getPreviousHighWatermark(table).getValue(), 0l);
+
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(part1).getValue(),
0l);
+
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(table).getValue(),
0l);
-
Assert.assertEquals(watermarker.getPreviousHighWatermark(part2).getValue(), 0l);
-
Assert.assertEquals(watermarker.getPreviousHighWatermark(table2).getValue(),
0l);
+
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(part2).getValue(),
0l);
+
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(table2).getValue(),
0l);
Assert.assertEquals(workunits.size(), 2);
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
index 41745f1af..85b85f079 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java
@@ -205,17 +205,17 @@ public class LoopingDatasetFinderSourceTest {
List<LongWatermark> watermarks1 = new ArrayList<>();
List<Dataset> datasets1 = new ArrayList<>();
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
"dataset1");
-
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
0);
+
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
0);
watermarks1.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
datasets1.add(dataset1);
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
"dataset2");
-
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
0);
+
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
0);
watermarks1.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
datasets1.add(dataset2);
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY),
"dataset3");
-
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
0);
+
Assert.assertEquals((long)workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
0);
watermarks1.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
datasets1.add(dataset3);
@@ -244,12 +244,12 @@ public class LoopingDatasetFinderSourceTest {
Assert.assertEquals(workUnits.size(), 3);
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
"dataset4");
-
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
0);
+
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
0);
watermarks2.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
datasets2.add(dataset4);
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
"dataset5");
-
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
0);
+
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
0);
watermarks2.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
datasets2.add(dataset5);
@@ -327,17 +327,17 @@ public class LoopingDatasetFinderSourceTest {
Assert.assertEquals(workUnits.size(), 4);
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
"dataset1");
-
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
0);
+
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
0);
watermarks1.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
datasets1.add(dataset1);
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
"dataset2@p1");
-
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
0);
+
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
0);
watermarks1.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
datasets1.add(new SimpleDatasetForTesting("dataset2@p1"));
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY),
"dataset2@p2");
-
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
0);
+
Assert.assertEquals((long)workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
0);
watermarks1.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
datasets1.add(new SimpleDatasetForTesting("dataset2@p2"));
@@ -367,17 +367,17 @@ public class LoopingDatasetFinderSourceTest {
Assert.assertEquals(workUnits.size(), 4);
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
"dataset2@p3");
-
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
0);
+
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
0);
watermarks2.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
datasets2.add(new SimpleDatasetForTesting("dataset2@p3"));
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY),
"dataset3@p1");
-
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
0);
+
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(),
0);
watermarks2.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
datasets2.add(new SimpleDatasetForTesting("dataset3@p1"));
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY),
"dataset3@p2");
-
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
0);
+
Assert.assertEquals((long)workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(),
0);
watermarks2.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
datasets2.add(new SimpleDatasetForTesting("dataset3@p2"));
@@ -404,7 +404,7 @@ public class LoopingDatasetFinderSourceTest {
Assert.assertEquals(workUnits.size(), 2);
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY),
"dataset3@p3");
-
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
0);
+
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(),
0);
watermarks3.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
datasets3.add(new SimpleDatasetForTesting("dataset3@p3"));
diff --git
a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/StateStoreWatermarkStorageTest.java
b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/StateStoreWatermarkStorageTest.java
index 19167aee1..d22f76b2a 100644
---
a/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/StateStoreWatermarkStorageTest.java
+++
b/gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/StateStoreWatermarkStorageTest.java
@@ -68,7 +68,7 @@ public class StateStoreWatermarkStorageTest {
ImmutableList.of("source"));
Assert.assertEquals(watermarkMap.size(), 1);
- Assert.assertEquals(((LongWatermark)
watermarkMap.get("source").getWatermark()).getValue(), startTime);
+ Assert.assertEquals((long)((LongWatermark)
watermarkMap.get("source").getWatermark()).getValue(), startTime);
}
@AfterClass