This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4a92e1e [GOBBLIN-1547] Set completion watermark to end of hour
instead of beg… (#3398)
4a92e1e is described below
commit 4a92e1e4e848310a5d9e76725a5b4717707e9f42
Author: vbohra <[email protected]>
AuthorDate: Fri Sep 17 15:47:20 2021 -0700
[GOBBLIN-1547] Set completion watermark to end of hour instead of beg…
(#3398)
* [GOBBLIN-1547] Set completion watermark to end of hour instead of
beginning
* Fixed code style
---
.../iceberg/writer/IcebergMetadataWriter.java | 16 ++++++----
.../iceberg/writer/IcebergMetadataWriterTest.java | 36 +++++++++++-----------
2 files changed, 28 insertions(+), 24 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index f67e614..28051a8 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -729,9 +729,10 @@ public class IcebergMetadataWriter implements
MetadataWriter {
private int isLate(String datepartition, long previousWatermark) {
ZonedDateTime partitionDateTime = ZonedDateTime.parse(datepartition,
HOURLY_DATEPARTITION_FORMAT);
long partitionEpochTime = partitionDateTime.toInstant().toEpochMilli();
- if(partitionEpochTime > previousWatermark) {
+ if(partitionEpochTime >= previousWatermark) {
return 0;
- } else if(partitionEpochTime <= previousWatermark &&
partitionDateTime.toLocalDate().equals(getDateFromEpochMillis(previousWatermark)))
{
+ } else if(partitionEpochTime < previousWatermark
+ &&
partitionDateTime.toLocalDate().equals(getDateFromEpochMillis(previousWatermark)))
{
return 1;
} else {
return 2;
@@ -861,11 +862,13 @@ public class IcebergMetadataWriter implements
MetadataWriter {
}
/**
+ * NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit
counts match
+ * for that window (aka its is set to the beginning of next window)
* For each timestamp in sorted collection of timestamps in descending order
* if timestamp is greater than previousWatermark
- * and hour(now) > hour(prevWatermark) + 1
+ * and hour(now) > hour(prevWatermark)
* check audit counts for completeness between
- * a source and reference tier for [timestamp, timstamp + 1 unit of
granularity]
+ * a source and reference tier for [timestamp -1 , timstamp unit of
granularity]
* If the audit count matches update the watermark to the timestamp and
break
* else continue
* else
@@ -895,9 +898,10 @@ public class IcebergMetadataWriter implements
MetadataWriter {
while (iterator.hasNext()) {
ZonedDateTime timestampDT = iterator.next();
if (timestampDT.isAfter(prevWatermarkDT)
- && TimeIterator.durationBetween(prevWatermarkDT, now, granularity)
> 1) {
+ && TimeIterator.durationBetween(prevWatermarkDT, now, granularity)
> 0) {
long timestampMillis = timestampDT.toInstant().toEpochMilli();
- if(auditCountVerifier.get().isComplete(table, timestampMillis,
TimeIterator.inc(timestampDT, granularity, 1).toInstant().toEpochMilli())) {
+ if (auditCountVerifier.get().isComplete(table,
+ TimeIterator.dec(timestampDT, granularity,
1).toInstant().toEpochMilli(), timestampMillis)) {
completionWatermark = timestampMillis;
break;
}
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index c234675..8aba5e7 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -330,8 +330,8 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
// Creating a copy of gmce with static type in GenericRecord to work with
writeEnvelop method
// without risking running into type cast runtime error.
gmce.setOperationType(OperationType.add_files);
- File hourlyFile = new File(tmpDir,
"data/tracking/testIcebergTable/hourly/2020/03/17/10/data.avro");
- long timestampMillis = 1584464400000L;
+ File hourlyFile = new File(tmpDir,
"data/tracking/testIcebergTable/hourly/2021/09/16/10/data.avro");
+ long timestampMillis = 1631811600000L;
Files.createParentDirs(hourlyFile);
writeRecord(hourlyFile);
gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
@@ -351,13 +351,13 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertTrue(table.spec().fields().size() == 2);
Assert.assertEquals(table.spec().fields().get(1).name(), "late");
- // Test when completeness watermark < "2020-03-17-10"
+ // Test when completeness watermark = -1 bootstrap case
KafkaAuditCountVerifier verifier =
Mockito.mock(TestAuditCountVerifier.class);
- Mockito.when(verifier.isComplete("testTopic", timestampMillis,
timestampMillis + TimeUnit.HOURS.toMillis(1))).thenReturn(true);
+ Mockito.when(verifier.isComplete("testTopic", timestampMillis -
TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
((IcebergMetadataWriter)
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
- //completeness watermark = "2020-03-17-10"
+ //completeness watermark = "2020-09-16-10"
Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY),
"America/Los_Angeles");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis));
@@ -365,8 +365,8 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Iterator<org.apache.iceberg.DataFile> dfl =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
hourlyFile.getAbsolutePath())).collect().iterator();
Assert.assertTrue(dfl.hasNext());
- // Test when completeness watermark is still "2020-03-17-10" but have a
late file for "2020-03-17-10"
- File hourlyFile1 = new File(tmpDir,
"data/tracking/testIcebergTable/hourly/2020/03/17/10/data1.avro");
+ // Test when completeness watermark is still "2021-09-16-10" but have a
late file for "2021-09-16-09"
+ File hourlyFile1 = new File(tmpDir,
"data/tracking/testIcebergTable/hourly/2021/09/16/09/data1.avro");
Files.createParentDirs(hourlyFile1);
writeRecord(hourlyFile1);
gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
@@ -386,10 +386,10 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
dfl =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
hourlyFile1.getAbsolutePath())).collect().iterator();
Assert.assertTrue(dfl.hasNext());
- Assert.assertTrue(dfl.next().partition().get(1, Integer.class) == 1);
+ Assert.assertEquals((int) dfl.next().partition().get(1, Integer.class), 1);
- // Test when completeness watermark will advance to "2020-03-17-11"
- File hourlyFile2 = new File(tmpDir,
"data/tracking/testIcebergTable/hourly/2020/03/17/11/data.avro");
+ // Test when completeness watermark will advance to "2021-09-16-11"
+ File hourlyFile2 = new File(tmpDir,
"data/tracking/testIcebergTable/hourly/2021/09/16/11/data.avro");
long timestampMillis1 = timestampMillis + TimeUnit.HOURS.toMillis(1);
Files.createParentDirs(hourlyFile2);
writeRecord(hourlyFile2);
@@ -405,7 +405,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(60L))));
- Mockito.when(verifier.isComplete("testTopic", timestampMillis1,
timestampMillis1 + TimeUnit.HOURS.toMillis(1))).thenReturn(true);
+ Mockito.when(verifier.isComplete("testTopic", timestampMillis1 -
TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis1));
@@ -440,13 +440,13 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
partitionValue = "2020-03-17-08";
} else if (path.toString().contains("hourly/2020/03/17/09")) {
partitionValue = "2020-03-17-09";
- } else if (path.toString().contains("hourly/2020/03/17/10")) {
- partitionValue = "2020-03-17-10";
- } else if (path.toString().contains("hourly/2020/03/17/11")) {
- partitionValue = "2020-03-17-11";
- } else if (path.toString().contains("hourly/2020/03/17/12")) {
- partitionValue = "2020-03-17-12";
- } else if (path.toString().contains("daily/2020/03/17")) {
+ } else if (path.toString().contains("hourly/2021/09/16/09")) {
+ partitionValue = "2021-09-16-09";
+ } else if (path.toString().contains("hourly/2021/09/16/10")) {
+ partitionValue = "2021-09-16-10";
+ } else if (path.toString().contains("hourly/2021/09/16/11")) {
+ partitionValue = "2021-09-16-11";
+ } else if (path.toString().contains("daily/2020/03/17")) {
partitionValue = "2020-03-17-00";
}
return Optional.of(new
HivePartition.Builder().withPartitionValues(Lists.newArrayList(partitionValue))