This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new bbbfe3bcc [GOBBLIN-1960] Emit audit count after commit in
IcebergMetadataWriter (#3833)
bbbfe3bcc is described below
commit bbbfe3bcc75f7a96c8c7c352f9dbfabe55ad67ad
Author: Matthew Ho <[email protected]>
AuthorDate: Wed Nov 29 14:26:01 2023 -0800
[GOBBLIN-1960] Emit audit count after commit in IcebergMetadataWriter
(#3833)
* Emit audit count after commit in IcebergMetadataWriter
* Unit tests by extracting to a post commit
* Emit audit count first
* find bugs complaint
---
.../iceberg/writer/IcebergMetadataWriter.java | 170 +++++++++++++--------
.../iceberg/writer/IcebergMetadataWriterTest.java | 99 +++++++++++-
2 files changed, 200 insertions(+), 69 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 713378b99..3da856edb 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -740,7 +740,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
}
dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file,
table.spec(), partition, conf, schemaIdMap));
} catch (Exception e) {
- log.warn("Cannot get DataFile for {} dur to {}", file.getFilePath(),
e);
+ log.warn("Cannot get DataFile for {} due to {}", file.getFilePath(),
e);
}
}
return dataFiles;
@@ -834,86 +834,122 @@ public class IcebergMetadataWriter implements
MetadataWriter {
public void flush(String dbName, String tableName) throws IOException {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
+ boolean transactionCommitted = false;
try {
TableIdentifier tid = TableIdentifier.of(dbName, tableName);
TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new
TableMetadata(this.conf));
- if (tableMetadata.transaction.isPresent()) {
- Transaction transaction = tableMetadata.transaction.get();
- Map<String, String> props = tableMetadata.newProperties.or(
-
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
- //Set data offset range
- setDatasetOffsetRange(tableMetadata, props);
- String topicName = getTopicName(tid, tableMetadata);
- if (tableMetadata.appendFiles.isPresent()) {
- tableMetadata.appendFiles.get().commit();
- try (Timer.Context context = new Timer().time()) {
- sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
- log.info("Sending audit counts for {} took {} ms", topicName,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
- }
- if (tableMetadata.completenessEnabled) {
- updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
- tableMetadata.totalCountCompletenessEnabled);
- }
- }
- if (tableMetadata.deleteFiles.isPresent()) {
- tableMetadata.deleteFiles.get().commit();
- }
- // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
- // The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
- if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
- && tableMetadata.completenessEnabled) {
- updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
- tableMetadata.totalCountCompletenessEnabled);
- }
+ if (!tableMetadata.transaction.isPresent()) {
+ log.info("There's no transaction initiated for the table {}", tid);
+ return;
+ }
- //Set high waterMark
- Long highWatermark = tableCurrentWatermarkMap.get(tid);
- props.put(String.format(GMCE_HIGH_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)), highWatermark.toString());
- //Set low waterMark
- props.put(String.format(GMCE_LOW_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)),
- tableMetadata.lowWatermark.get().toString());
- //Set whether to delete metadata files after commit
- if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY,
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
- props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
Boolean.toString(
-
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
- props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
Integer.toString(
- conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
- }
- //Update schema(commit)
- updateSchema(tableMetadata, props, topicName);
- //Update properties
- UpdateProperties updateProperties = transaction.updateProperties();
- props.forEach(updateProperties::set);
- updateProperties.commit();
- try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName);
- Timer.Context context = new Timer().time()) {
- transaction.commitTransaction();
- log.info("Committing transaction for table {} took {} ms", tid,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ Transaction transaction = tableMetadata.transaction.get();
+ Map<String, String> props = tableMetadata.newProperties.or(
+
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+ //Set data offset range
+ setDatasetOffsetRange(tableMetadata, props);
+ String topicName = getTopicName(tid, tableMetadata);
+ if (tableMetadata.appendFiles.isPresent()) {
+ tableMetadata.appendFiles.get().commit();
+ if (tableMetadata.completenessEnabled) {
+ updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
+ tableMetadata.totalCountCompletenessEnabled);
}
+ }
- // Emit GTE for snapshot commits
- Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
- Map<String, String> currentProps =
tableMetadata.table.get().properties();
- try (Timer.Context context = new Timer().time()) {
- submitSnapshotCommitEvent(snapshot, tableMetadata, dbName,
tableName, currentProps, highWatermark);
- log.info("Sending snapshot commit event for {} took {} ms",
topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
- }
+ if (tableMetadata.deleteFiles.isPresent()) {
+ tableMetadata.deleteFiles.get().commit();
+ }
+ // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
+ // The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
+ if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
+ && tableMetadata.completenessEnabled) {
+ updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
+ tableMetadata.totalCountCompletenessEnabled);
+ }
- //Reset the table metadata for next accumulation period
- tableMetadata.reset(currentProps, highWatermark);
- log.info(String.format("Finish commit of new snapshot %s for table
%s", snapshot.snapshotId(), tid));
- } else {
- log.info("There's no transaction initiated for the table {}", tid);
+ //Set high waterMark
+ Long highWatermark = tableCurrentWatermarkMap.get(tid);
+ props.put(String.format(GMCE_HIGH_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)), highWatermark.toString());
+ //Set low waterMark
+ props.put(String.format(GMCE_LOW_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)),
+ tableMetadata.lowWatermark.get().toString());
+ //Set whether to delete metadata files after commit
+ if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY,
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
+ props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
Boolean.toString(
+
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
+ props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
Integer.toString(
+ conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
+ }
+ //Update schema(commit)
+ updateSchema(tableMetadata, props, topicName);
+ //Update properties
+ UpdateProperties updateProperties = transaction.updateProperties();
+ props.forEach(updateProperties::set);
+ updateProperties.commit();
+ try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName);
+ Timer.Context context = new Timer().time()) {
+ transaction.commitTransaction();
+ log.info("Committing transaction for table {} took {} ms", tid,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ transactionCommitted = true;
}
+
+ postCommit(tableMetadata, dbName, tableName, topicName, highWatermark);
+ //Reset the table metadata for next accumulation period
+ Map<String, String> currentProps =
tableMetadata.table.get().properties();
+ Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
+ tableMetadata.reset(currentProps, highWatermark);
+ log.info(String.format("Finish commit of new snapshot %s for table %s",
snapshot.snapshotId(), tid));
} catch (RuntimeException e) {
- throw new IOException(String.format("Fail to flush table %s %s", dbName,
tableName), e);
+ throw new IOException(String.format("Failed to flush table %s %s.
transactionCommitted=%s",
+ dbName, tableName, transactionCommitted), e);
} catch (Exception e) {
- throw new IOException(String.format("Fail to flush table %s %s", dbName,
tableName), e);
+ throw new IOException(String.format("Failed to flush table %s %s.
transactionCommitted=%s",
+ dbName, tableName, transactionCommitted), e);
} finally {
writeLock.unlock();
}
}
+ /**
+ * PostCommit operation that executes after the transaction is committed to
the Iceberg table. Operations in this
+ * method are considered non-critical to the transaction and will not cause
the transaction to fail if they fail,
+ * but should ideally still be executed for observability.
+ *
+ * One example of this is observability events / metrics like {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}.
+ * The default behavior is to emit a GTE for the commit event and a kafka
audit event
+ *
+ * @param tableMetadata
+ * @param dbName
+ * @param tableName
+ * @param topicName
+ * @param highWatermark
+ * @throws IOException
+ */
+ protected void postCommit(
+ TableMetadata tableMetadata,
+ String dbName,
+ String tableName,
+ String topicName,
+ long highWatermark) throws IOException {
+ // Emit GTE for snapshot commits
+ Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
+ Map<String, String> currentProps = tableMetadata.table.get().properties();
+
+ // Sending the audit count before the snapshot commit event because
downstream users are more likely
+ // to consume this audit count API for determining completion since it is
agnostic to the system (e.g. Kafka, Brooklin)
+ // The snapshot commit event is more for internal monitoring.
+ try (Timer.Context context = new Timer().time()) {
+ sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
+ log.info("Sending audit counts for {} took {} ms", topicName,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ }
+
+ try (Timer.Context context = new Timer().time()) {
+ submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName,
currentProps, highWatermark);
+ log.info("Sending snapshot commit event for {} took {} ms", topicName,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ }
+ }
+
private CompletenessWatermarkUpdater getWatermarkUpdater(String topicName,
TableMetadata tableMetadata,
Map<String, String> propsToUpdate) {
return new CompletenessWatermarkUpdater(topicName,
this.auditCheckGranularity, this.timeZone,
@@ -946,7 +982,7 @@ public class IcebergMetadataWriter implements
MetadataWriter {
this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
}
- private void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata
tableMetadata, String dbName,
+ protected void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata
tableMetadata, String dbName,
String tableName, Map<String, String> props, Long highWaterMark) {
GobblinEventBuilder gobblinTrackingEvent =
new GobblinEventBuilder(MetadataWriterKeys.ICEBERG_COMMIT_EVENT_NAME);
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 6feb8adae..e0e2f4f4f 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -22,8 +22,16 @@ import java.io.IOException;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
@@ -39,6 +47,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.iceberg.FindFiles;
+import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.expressions.Expressions;
@@ -405,7 +414,7 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
@Test(dependsOnMethods={"testChangeProperty"},
groups={"icebergMetadataWriterTest"})
public void testWriteAddFileGMCECompleteness() throws IOException {
- // Creating a copy of gmce with static type in GenericRecord to work with
writeEnvelop method
+ // Creating a copy of gmce with static type in GenericRecord to work with
writeEnvelope method
// without risking running into type cast runtime error.
gmce.setOperationType(OperationType.add_files);
File hourlyFile = new File(tmpDir,
"testDB/testTopicCompleteness/hourly/2021/09/16/10/data.avro");
@@ -546,6 +555,57 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
Assert.assertEquals(table.properties().get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY),
String.valueOf(expectedWatermark));
}
+ @Test(dependsOnMethods={"testChangePropertyGMCECompleteness"},
groups={"icebergMetadataWriterTest"})
+ public void testKafkaAuditAndGTEEmittedAfterIcebergCommitDuringFlush()
throws IOException {
+ State state = getState();
+ state.setProp(GobblinMCEWriter.GMCE_METADATA_WRITER_CLASSES,
SpyIcebergMetadataWriter.class.getName());
+ GobblinMCEWriter gobblinMCEWriterWithSpy = new GobblinMCEWriter(new
GobblinMCEWriterBuilder(), state);
+ // Set fault tolerant dataset number to be 1 so watermark is updated
+ gobblinMCEWriterWithSpy.setMaxErrorDataset(1);
+
+ Assert.assertEquals(gobblinMCEWriterWithSpy.metadataWriters.size(), 1);
+
Assert.assertEquals(gobblinMCEWriterWithSpy.metadataWriters.get(0).getClass().getName(),
SpyIcebergMetadataWriter.class.getName());
+ SpyIcebergMetadataWriter spyIcebergMetadataWriter =
+ (SpyIcebergMetadataWriter)
gobblinMCEWriterWithSpy.metadataWriters.get(0);
+
+
+ // For quiet topics, watermark should always be beginning of current hour
+ File hourlyFile = new File(tmpDir,
"testDB/testTopic/hourly/2021/09/16/11/failAfterCommit.avro");
+ Files.createParentDirs(hourlyFile);
+ writeRecord(hourlyFile);
+ Assert.assertTrue(hourlyFile.exists());
+ gmce.setOldFilePrefixes(null);
+ gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
+ .setFilePath(hourlyFile.toString())
+ .setFileFormat("avro")
+ .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
+ .build()));
+ gmce.setOperationType(OperationType.add_files);
+ gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String,
String>builder().put("testTopic-1", "4000-4001").build());
+ gmce.setAllowedMetadataWriters(new ArrayList<>());
+ GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(),
gmce);
+ gobblinMCEWriterWithSpy.writeEnvelope(new RecordEnvelope<>(genericGmce,
+ new KafkaStreamingExtractor.KafkaWatermark(
+ new
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+ new LongWatermark(70L))));
+
+ gobblinMCEWriterWithSpy.flush();
+
+ Table table =
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+ Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
+
+ // get the file that patches the path of the file that failed to be added
+ Iterator<org.apache.iceberg.DataFile> result =
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path",
hourlyFile.getAbsolutePath())).collect().iterator();
+ Assert.assertTrue(result.hasNext());
+ Assert.assertEquals(result.next().path(), hourlyFile.getAbsolutePath());
+ Assert.assertEquals(table.properties().get("offset.range.testTopic-1"),
"0-4001");
+
+ // The audit count
+
Assert.assertEquals(spyIcebergMetadataWriter.methodsCalledCounter.get("postCommit").get(),
1);
+
Assert.assertEquals(spyIcebergMetadataWriter.methodsCalledCounter.get("sendAuditCounts"),
null);
+
Assert.assertEquals(spyIcebergMetadataWriter.methodsCalledCounter.get("submitSnapshotCommitEvent"),
null);
+ }
+
private String writeRecord(File file) throws IOException {
GenericData.Record record = new GenericData.Record(avroDataSchema);
record.put("id", 1L);
@@ -621,5 +681,40 @@ public class IcebergMetadataWriterTest extends
HiveMetastoreTest {
super(state, client);
}
}
+
+ /**
+ * A spy class for IcebergMetadataWriter to track the methods called and
intentionally
+ * invoke failure after the iceberg transaction is committed
+ */
+ public static class SpyIcebergMetadataWriter extends IcebergMetadataWriter {
+ public Map<String, AtomicInteger> methodsCalledCounter = new HashMap<>();
+
+ public SpyIcebergMetadataWriter(State state)
+ throws IOException {
+ super(state);
+ }
+
+ protected void postCommit(TableMetadata tableMetadata, String dbName,
String tableName, String topicName,
+ long highWatermark) {
+ String methodName = new Object()
{}.getClass().getEnclosingMethod().getName();
+ methodsCalledCounter.putIfAbsent(methodName, new AtomicInteger(0));
+ methodsCalledCounter.get(methodName).incrementAndGet();
+ throw new RuntimeException("Intentionally aborting postcommit for
testing");
+ }
+
+ @Override
+ public void sendAuditCounts(String topicName, Collection<String>
serializedAuditCountMaps) {
+ String methodName = new Object()
{}.getClass().getEnclosingMethod().getName();
+ methodsCalledCounter.putIfAbsent(methodName, new AtomicInteger(0));
+ methodsCalledCounter.get(methodName).incrementAndGet();
+ }
+
+ protected void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata
tableMetadata, String dbName,
+ String tableName, Map<String, String> props, Long highWaterMark) {
+ String methodName = new Object()
{}.getClass().getEnclosingMethod().getName();
+ methodsCalledCounter.putIfAbsent(methodName, new AtomicInteger(0));
+ methodsCalledCounter.get(methodName).incrementAndGet();
+ }
+ }
}