This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ef18c482b [GOBBLIN-1657] Update completion watermark on
change_property in IcebergMetadataWriter (#3517)
ef18c482b is described below
commit ef18c482b241378f8912fa1944914167a61f147c
Author: vbohra <[email protected]>
AuthorDate: Tue Jun 7 14:48:27 2022 -0700
[GOBBLIN-1657] Update completion watermark on change_property in
IcebergMetadataWriter (#3517)
* [GOBBLIN-1655] Update completion watermark for quiet tables during
iceberg registration
* [GOBBLIN-1657] Update completion watermark on change_proerty GMCE
* Added test case to check watermark update on change_property
Co-authored-by: Vikram Bohra <[email protected]>
---
.../verifier/KafkaAuditCountVerifier.java | 7 ++
.../iceberg/publisher/GobblinMCEPublisher.java | 3 +
.../iceberg/writer/IcebergMetadataWriter.java | 86 ++++++++++++++--------
.../iceberg/writer/IcebergMetadataWriterTest.java | 41 ++++++++++-
4 files changed, 103 insertions(+), 34 deletions(-)
diff --git
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
index 04d49a1db..d4495f9eb 100644
---
a/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
+++
b/gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java
@@ -45,6 +45,8 @@ public class KafkaAuditCountVerifier {
public static final String REFERENCE_TIERS = COMPLETENESS_PREFIX +
"reference.tiers";
public static final String THRESHOLD = COMPLETENESS_PREFIX + "threshold";
private static final double DEFAULT_THRESHOLD = 0.999;
+ public static final String COMPLETE_ON_NO_COUNTS = COMPLETENESS_PREFIX +
"complete.on.no.counts";
+ private final boolean returnCompleteOnNoCounts;
private final AuditCountClient auditCountClient;
private final String srcTier;
@@ -67,6 +69,7 @@ public class KafkaAuditCountVerifier {
state.getPropAsDouble(THRESHOLD, DEFAULT_THRESHOLD);
this.srcTier = state.getProp(SOURCE_TIER);
this.refTiers =
Splitter.on(",").omitEmptyStrings().trimResults().splitToList(state.getProp(REFERENCE_TIERS));
+ this.returnCompleteOnNoCounts =
state.getPropAsBoolean(COMPLETE_ON_NO_COUNTS, false);
}
/**
@@ -119,6 +122,10 @@ public class KafkaAuditCountVerifier {
Map<String, Long> countsByTier = getTierAndCount(datasetName,
beginInMillis, endInMillis);
log.info(String.format("Audit counts map for %s for range [%s,%s]",
datasetName, beginInMillis, endInMillis));
countsByTier.forEach((x,y) -> log.info(String.format(" %s : %s ", x, y)));
+ if (countsByTier.isEmpty() && this.returnCompleteOnNoCounts) {
+ log.info(String.format("Found empty counts map for %s, returning
complete", datasetName));
+ return 1.0;
+ }
double percent = -1;
if (!countsByTier.containsKey(this.srcTier)) {
throw new IOException(String.format("Source tier %s audit count cannot
be retrieved for dataset %s between %s and %s", this.srcTier, datasetName,
beginInMillis, endInMillis));
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
index 8b76843af..3e58a53d1 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java
@@ -98,7 +98,10 @@ public class GobblinMCEPublisher extends DataPublisher {
// There'll be only one dummy file here. This file is parsed for DB
and table name calculation.
newFiles = computeDummyFile(state);
if (!newFiles.isEmpty()) {
+ log.info("Dummy file: " + newFiles.keySet().iterator().next());
this.producer.sendGMCE(newFiles, null, null, offsetRange,
OperationType.change_property, SchemaSource.NONE);
+ } else {
+ log.info("No dummy file created. Not sending GMCE");
}
} else {
this.producer.sendGMCE(newFiles, null, null, offsetRange,
OperationType.add_files, SchemaSource.SCHEMAREGISTRY);
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index ebc633e2c..9b0b18450 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -312,6 +312,11 @@ public class IcebergMetadataWriter implements
MetadataWriter {
return;
}
}
+ if(tableMetadata.completenessEnabled) {
+ tableMetadata.completionWatermark =
Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
+ String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
+ }
+
computeCandidateSchema(gmce, tid, tableSpec);
tableMetadata.ensureTxnInit();
tableMetadata.lowestGMCEEmittedTime =
Long.min(tableMetadata.lowestGMCEEmittedTime, gmce.getGMCEmittedTime());
@@ -322,12 +327,6 @@ public class IcebergMetadataWriter implements
MetadataWriter {
if (gmce.getTopicPartitionOffsetsRange() != null) {
mergeOffsets(gmce, tid);
}
- //compute topic name
- if(!tableMetadata.newProperties.get().containsKey(TOPIC_NAME_KEY) &&
- tableMetadata.dataOffsetRange.isPresent() &&
!tableMetadata.dataOffsetRange.get().isEmpty()) {
- String topicPartition =
tableMetadata.dataOffsetRange.get().keySet().iterator().next();
- tableMetadata.newProperties.get().put(TOPIC_NAME_KEY,
topicPartition.substring(0, topicPartition.lastIndexOf("-")));
- }
break;
}
case rewrite_files: {
@@ -411,6 +410,9 @@ public class IcebergMetadataWriter implements
MetadataWriter {
org.apache.hadoop.hive.metastore.api.Table table =
HiveMetaStoreUtils.getTable(tableSpec.getTable());
TableMetadata tableMetadata = tableMetadataMap.computeIfAbsent(tid, t ->
new TableMetadata());
tableMetadata.newProperties =
Optional.of(IcebergUtils.getTableProperties(table));
+ String nativeName = tableMetadata.datasetName;
+ String topic = nativeName.substring(nativeName.lastIndexOf("/") + 1);
+ tableMetadata.newProperties.get().put(TOPIC_NAME_KEY, topic);
}
/**
@@ -692,8 +694,6 @@ public class IcebergMetadataWriter implements
MetadataWriter {
StructLike partition = getIcebergPartitionVal(hiveSpecs,
file.getFilePath(), partitionSpec);
if(tableMetadata.newPartitionColumnEnabled && gmce.getOperationType()
== OperationType.add_files) {
- tableMetadata.prevCompletenessWatermark =
Long.parseLong(table.properties().getOrDefault(COMPLETION_WATERMARK_KEY,
- String.valueOf(DEFAULT_COMPLETION_WATERMARK)));
// Assumes first partition value to be partitioned by date
// TODO Find better way to determine a partition value
String datepartition = partition.get(0, null);
@@ -722,8 +722,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
private StructLike addLatePartitionValueToIcebergTable(Table table,
TableMetadata tableMetadata, HivePartition hivePartition, String datepartition)
{
table = addPartitionToIcebergTable(table, newPartitionColumn,
newPartitionColumnType);
PartitionSpec partitionSpec = table.spec();
- long prevCompletenessWatermark = tableMetadata.prevCompletenessWatermark;
- int late = !tableMetadata.completenessEnabled ? 0 : isLate(datepartition,
prevCompletenessWatermark);
+ int late = !tableMetadata.completenessEnabled ? 0 : isLate(datepartition,
tableMetadata.completionWatermark);
List<String> partitionValues = new ArrayList<>(hivePartition.getValues());
partitionValues.add(String.valueOf(late));
return IcebergUtils.getPartition(partitionSpec.partitionType(),
partitionValues);
@@ -790,28 +789,33 @@ public class IcebergMetadataWriter implements
MetadataWriter {
Transaction transaction = tableMetadata.transaction.get();
Map<String, String> props = tableMetadata.newProperties.or(
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+ String topic = props.get(TOPIC_NAME_KEY);
if (tableMetadata.appendFiles.isPresent()) {
tableMetadata.appendFiles.get().commit();
if (tableMetadata.completenessEnabled) {
- String topicName = props.get(TOPIC_NAME_KEY);
- if(topicName == null) {
- log.error(String.format("Not performing audit check. %s is null.
Please set as table property of %s.%s",
- TOPIC_NAME_KEY, dbName, tableName));
- } else {
- long newCompletenessWatermark =
- computeCompletenessWatermark(topicName,
tableMetadata.datePartitions, tableMetadata.prevCompletenessWatermark);
- if(newCompletenessWatermark >
tableMetadata.prevCompletenessWatermark) {
- log.info(String.format("Updating %s for %s.%s to %s",
COMPLETION_WATERMARK_KEY, dbName, tableName, newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_KEY,
String.valueOf(newCompletenessWatermark));
- props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
- tableMetadata.newCompletenessWatermark =
newCompletenessWatermark;
- }
- }
+ checkAndUpdateCompletenessWatermark(tableMetadata, topic,
tableMetadata.datePartitions, props);
}
}
if (tableMetadata.deleteFiles.isPresent()) {
tableMetadata.deleteFiles.get().commit();
}
+ // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
+ // The logic is to check the next window from previous completion
watermark and update the watermark if there are no audit counts
+ if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
+ && tableMetadata.completenessEnabled) {
+ if (tableMetadata.completionWatermark >
DEFAULT_COMPLETION_WATERMARK) {
+ log.info(String.format("Checking kafka audit for %s on
change_property ", topic));
+ SortedSet<ZonedDateTime> timestamps = new TreeSet<>();
+ ZonedDateTime prevWatermarkDT =
+
Instant.ofEpochMilli(tableMetadata.completionWatermark).atZone(ZoneId.of(this.timeZone));
+ timestamps.add(TimeIterator.inc(prevWatermarkDT,
TimeIterator.Granularity.valueOf(this.auditCheckGranularity), 1));
+ checkAndUpdateCompletenessWatermark(tableMetadata, topic,
timestamps, props);
+ } else {
+ log.info(String.format("Need valid watermark, current watermark is
%s, Not checking kafka audit for %s",
+ tableMetadata.completionWatermark, topic));
+ }
+ }
+
//Set high waterMark
Long highWatermark = tableCurrentWatermarkMap.get(tid);
props.put(String.format(GMCE_HIGH_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)), highWatermark.toString());
@@ -835,7 +839,6 @@ public class IcebergMetadataWriter implements
MetadataWriter {
}
//Update schema(commit)
updateSchema(tableMetadata, props, topicName);
-
//Update properties
UpdateProperties updateProperties = transaction.updateProperties();
props.forEach(updateProperties::set);
@@ -850,7 +853,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName,
currentProps, highWatermark);
//Reset the table metadata for next accumulation period
- tableMetadata.reset(currentProps, highWatermark,
tableMetadata.newCompletenessWatermark);
+ tableMetadata.reset(currentProps, highWatermark);
log.info(String.format("Finish commit of new snapshot %s for table
%s", snapshot.snapshotId(), tid.toString()));
} else {
log.info("There's no transaction initiated for the table {}",
tid.toString());
@@ -869,6 +872,30 @@ public class IcebergMetadataWriter implements
MetadataWriter {
this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
}
+ /**
+ * Update TableMetadata with the new completion watermark upon a successful
audit check
+ * @param tableMetadata metadata of table
+ * @param topic topic name
+ * @param timestamps Sorted set in reverse order of timestamps to check
audit counts for
+ * @param props table properties map
+ */
+ private void checkAndUpdateCompletenessWatermark(TableMetadata
tableMetadata, String topic, SortedSet<ZonedDateTime> timestamps,
+ Map<String, String> props) {
+ if (topic == null) {
+ log.error(String.format("Not performing audit check. %s is null. Please
set as table property of %s",
+ TOPIC_NAME_KEY, tableMetadata.table.get().name()));
+ }
+ long newCompletenessWatermark =
+ computeCompletenessWatermark(topic, timestamps,
tableMetadata.completionWatermark);
+ if (newCompletenessWatermark > tableMetadata.completionWatermark) {
+ log.info(String.format("Updating %s for %s to %s",
COMPLETION_WATERMARK_KEY, tableMetadata.table.get().name(),
+ newCompletenessWatermark));
+ props.put(COMPLETION_WATERMARK_KEY,
String.valueOf(newCompletenessWatermark));
+ props.put(COMPLETION_WATERMARK_TIMEZONE_KEY, this.timeZone);
+ tableMetadata.completionWatermark = newCompletenessWatermark;
+ }
+ }
+
/**
* NOTE: completion watermark for a window [t1, t2] is marked as t2 if audit
counts match
* for that window (aka its is set to the beginning of next window)
@@ -1085,8 +1112,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
Optional<Map<String, List<Range>>> dataOffsetRange = Optional.absent();
Optional<String> lastSchemaVersion = Optional.absent();
Optional<Long> lowWatermark = Optional.absent();
- long prevCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
- long newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
+ long completionWatermark = DEFAULT_COMPLETION_WATERMARK;
SortedSet<ZonedDateTime> datePartitions = new
TreeSet<>(Collections.reverseOrder());
@Setter
@@ -1131,7 +1157,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
}
}
- void reset(Map<String, String> props, Long lowWaterMark, long
newCompletionWatermark) {
+ void reset(Map<String, String> props, Long lowWaterMark) {
this.lastProperties = Optional.of(props);
this.lastSchemaVersion =
Optional.of(props.get(SCHEMA_CREATION_TIME_KEY));
this.transaction = Optional.absent();
@@ -1148,8 +1174,6 @@ public class IcebergMetadataWriter implements
MetadataWriter {
this.newProperties = Optional.absent();
this.lowestGMCEEmittedTime = Long.MAX_VALUE;
this.lowWatermark = Optional.of(lowWaterMark);
- this.prevCompletenessWatermark = newCompletionWatermark;
- this.newCompletenessWatermark = DEFAULT_COMPLETION_WATERMARK;
this.datePartitions.clear();
}
}
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index da564fa3d..5e2e3d00d 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -423,12 +423,12 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
// Test when completeness watermark = -1 bootstrap case
KafkaAuditCountVerifier verifier =
Mockito.mock(TestAuditCountVerifier.class);
- Mockito.when(verifier.isComplete("testTopic", timestampMillis -
TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
+ Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis -
TimeUnit.HOURS.toMillis(1), timestampMillis)).thenReturn(true);
((IcebergMetadataWriter)
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
//completeness watermark = "2020-09-16-10"
- Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
+ Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY),
"testIcebergTable");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY),
"America/Los_Angeles");
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis));
@@ -475,7 +475,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
new LongWatermark(60L))));
- Mockito.when(verifier.isComplete("testTopic", timestampMillis1 -
TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
+ Mockito.when(verifier.isComplete("testIcebergTable", timestampMillis1 -
TimeUnit.HOURS.toMillis(1), timestampMillis1)).thenReturn(true);
gobblinMCEWriterWithCompletness.flush();
table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(timestampMillis1));
@@ -486,6 +486,41 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
}
+ @Test(dependsOnMethods={"testWriteAddFileGMCECompleteness"},
groups={"icebergMetadataWriterTest"})
+ public void testChangePropertyGMCECompleteness() throws IOException {
+
+ Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ long watermark =
Long.parseLong(table.properties().get(COMPLETION_WATERMARK_KEY));
+ long expectedWatermark = watermark + TimeUnit.HOURS.toMillis(1);
+ File hourlyFile2 = new File(tmpDir,
"testDB/testIcebergTable/hourly/2021/09/16/11/data.avro");
+ gmce.setOldFilePrefixes(null);
+ gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
+ .setFilePath(hourlyFile2.toString())
+ .setFileFormat("avro")
+ .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
+ .build()));
+ gmce.setOperationType(OperationType.change_property);
+ gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "6000-7000").build());
+ GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(),
gmce);
+ gobblinMCEWriterWithCompletness.writeEnvelope(new
RecordEnvelope<>(genericGmce,
+ new KafkaStreamingExtractor.KafkaWatermark(
+ new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+ new LongWatermark(65L))));
+
+ KafkaAuditCountVerifier verifier =
Mockito.mock(TestAuditCountVerifier.class);
+ Mockito.when(verifier.isComplete("testIcebergTable", watermark,
expectedWatermark)).thenReturn(true);
+ ((IcebergMetadataWriter)
gobblinMCEWriterWithCompletness.metadataWriters.iterator().next()).setAuditCountVerifier(verifier);
+ gobblinMCEWriterWithCompletness.flush();
+
+ table = catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-7000");
+ Assert.assertEquals(table.spec().fields().get(1).name(), "late");
+ Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY),
"testIcebergTable");
+
Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_TIMEZONE_KEY),
"America/Los_Angeles");
+ Assert.assertEquals(table.properties().get(COMPLETION_WATERMARK_KEY),
String.valueOf(expectedWatermark));
+
+ }
+
private String writeRecord(File file) throws IOException {
GenericData.Record record = new GenericData.Record(avroDataSchema);
record.put("id", 1L);