This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7675dc43d Add completion watermark to State (#3659)
7675dc43d is described below
commit 7675dc43dd7128bb2daedb8c6b826e8fcc9104c7
Author: Limian (Raymond) Zhang <[email protected]>
AuthorDate: Thu Mar 16 17:18:19 2023 -0700
Add completion watermark to State (#3659)
* 1. Add completioness to state
* 2. Add additioal unit test checks
* 3. revert local gradle changes
* Refactor how we store watermark in State
* Refactor param name
* Refactor
---
.../iceberg/writer/IcebergMetadataWriter.java | 22 +++++++++++++++-------
.../writer/IcebergMetadataWriterConfigKeys.java | 1 +
.../iceberg/writer/IcebergMetadataWriterTest.java | 12 +++++++-----
3 files changed, 23 insertions(+), 12 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index e643b1dbb..9c2c4fdf1 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -31,6 +31,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
@@ -893,12 +894,13 @@ public class IcebergMetadataWriter implements
MetadataWriter {
*/
private void checkAndUpdateCompletenessWatermark(TableMetadata
tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
Map<String, String> props) {
+ String tableName = tableMetadata.table.get().name();
if (topic == null) {
log.error(String.format("Not performing audit check. %s is null. Please
set as table property of %s",
- TOPIC_NAME_KEY, tableMetadata.table.get().name()));
+ TOPIC_NAME_KEY, tableName));
}
long newCompletenessWatermark =
- computeCompletenessWatermark(topic, timestamps,
tableMetadata.completionWatermark);
+ computeCompletenessWatermark(tableName, topic, timestamps,
tableMetadata.completionWatermark);
if (newCompletenessWatermark > tableMetadata.completionWatermark) {
log.info(String.format("Updating %s for %s to %s",
COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
newCompletenessWatermark));
@@ -922,13 +924,14 @@ public class IcebergMetadataWriter implements
MetadataWriter {
* break
* Using a {@link TimeIterator} that operates over a range of time in 1 unit
* given the start, end and granularity
- * @param table
+ * @param catalogDbTableName
+ * @param topicName
* @param timestamps a sorted set of timestamps in decreasing order
* @param previousWatermark previous completion watermark for the table
* @return updated completion watermark
*/
- private long computeCompletenessWatermark(String table,
SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
- log.info(String.format("Compute completion watermark for %s and timestamps
%s with previous watermark %s", table, timestamps, previousWatermark));
+ private long computeCompletenessWatermark(String catalogDbTableName, String
topicName, SortedSet<ZonedDateTime> timestamps, long previousWatermark) {
+ log.info(String.format("Compute completion watermark for %s and timestamps
%s with previous watermark %s", topicName, timestamps, previousWatermark));
long completionWatermark = previousWatermark;
ZonedDateTime now = ZonedDateTime.now(ZoneId.of(this.timeZone));
try {
@@ -947,9 +950,14 @@ public class IcebergMetadataWriter implements
MetadataWriter {
if (timestampDT.isAfter(prevWatermarkDT)
&& TimeIterator.durationBetween(prevWatermarkDT, now, granularity)
> 0) {
long timestampMillis = timestampDT.toInstant().toEpochMilli();
- if (auditCountVerifier.get().isComplete(table,
- TimeIterator.dec(timestampDT, granularity,
1).toInstant().toEpochMilli(), timestampMillis)) {
+ ZonedDateTime auditCountCheckLowerBoundDT =
TimeIterator.dec(timestampDT, granularity, 1);
+ if (auditCountVerifier.get().isComplete(topicName,
+ auditCountCheckLowerBoundDT.toInstant().toEpochMilli(),
timestampMillis)) {
completionWatermark = timestampMillis;
+ // Also persist the watermark into State object to share this with
other MetadataWriters
+ // we enforce ourselves to always use lower-cased table name here
+ String catalogDbTableNameLowerCased =
catalogDbTableName.toLowerCase(Locale.ROOT);
+
this.state.setProp(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
catalogDbTableNameLowerCased), completionWatermark);
break;
}
} else {
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
index c846270d7..cd51ca3c6 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterConfigKeys.java
@@ -40,6 +40,7 @@ public class IcebergMetadataWriterConfigKeys {
public static final boolean DEFAULT_ICEBERG_NEW_PARTITION_ENABLED = false;
public static final String ICEBERG_NEW_PARTITION_WHITELIST =
"iceberg.new.partition.whitelist";
public static final String ICEBERG_NEW_PARTITION_BLACKLIST =
"iceberg.new.partition.blacklist";
+ public static final String STATE_COMPLETION_WATERMARK_KEY_OF_TABLE =
"completion.watermark.%s";
}
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index f9b3b327c..b6174ec7d 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -19,10 +19,7 @@ package org.apache.gobblin.iceberg.writer;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.TimeUnit;
import org.apache.avro.SchemaBuilder;
@@ -425,13 +422,16 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
// Test when completeness watermark = -1 bootstrap case
KafkaAuditCountVerifier verifier =
Mockito.mock(TestAuditCountVerifier.class);
Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis -
TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
- ((IcebergMetadataWriter)
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
+ IcebergMetadataWriter imw = (IcebergMetadataWriter)
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next();
+ imw.setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
//completeness watermark = "2020-09-16-10"
Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY),
"testIcebergTable");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY),
"America/Los_Angeles");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis));
+ // 1631811600000L correspond to 2020-09-16-10 in PT
+
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
table.name().toLowerCase(Locale.ROOT))), 1631811600000L);
Iterator<org.apache.iceberg.DataFile> dfl =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
hourlyFile.getAbsolutePath())).collect().iterator();
Assert.assertTrue(dfl.hasNext());
@@ -480,6 +480,8 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis1));
+ // watermark 1631815200000L correspond to 2021-09-16-11 in PT
+
Assert.assertEquals(imw.state.getPropAsLong(String.format(STATE_COMPLETION_WATERMARK_KEY_OF_TABLE,
table.name().toLowerCase(Locale.ROOT))), 1631815200000L);
dfl =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
hourlyFile2.getAbsolutePath())).collect().iterator();
Assert.assertTrue(dfl.hasNext());