This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new bbbfe3bcc [GOBBLIN-1960] Emit audit count after commit in 
IcebergMetadataWriter (#3833)
bbbfe3bcc is described below

commit bbbfe3bcc75f7a96c8c7c352f9dbfabe55ad67ad
Author: Matthew Ho <[email protected]>
AuthorDate: Wed Nov 29 14:26:01 2023 -0800

    [GOBBLIN-1960] Emit audit count after commit in IcebergMetadataWriter 
(#3833)
    
    * Emit audit count after commit in IcebergMetadataWriter
    
    * Unit tests by extracting to a post commit
    
    * Emit audit count first
    
    * find bugs complaint
---
 .../iceberg/writer/IcebergMetadataWriter.java      | 170 +++++++++++++--------
 .../iceberg/writer/IcebergMetadataWriterTest.java  |  99 +++++++++++-
 2 files changed, 200 insertions(+), 69 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 713378b99..3da856edb 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -740,7 +740,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
         }
         dataFiles.add(IcebergUtils.getIcebergDataFileWithMetric(file, 
table.spec(), partition, conf, schemaIdMap));
       } catch (Exception e) {
-        log.warn("Cannot get DataFile for {} dur to {}", file.getFilePath(), 
e);
+        log.warn("Cannot get DataFile for {} due to {}", file.getFilePath(), 
e);
       }
     }
     return dataFiles;
@@ -834,86 +834,122 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
   public void flush(String dbName, String tableName) throws IOException {
     Lock writeLock = readWriteLock.writeLock();
     writeLock.lock();
+    boolean transactionCommitted = false;
     try {
       TableIdentifier tid = TableIdentifier.of(dbName, tableName);
       TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new 
TableMetadata(this.conf));
-      if (tableMetadata.transaction.isPresent()) {
-        Transaction transaction = tableMetadata.transaction.get();
-        Map<String, String> props = tableMetadata.newProperties.or(
-            
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
-        //Set data offset range
-        setDatasetOffsetRange(tableMetadata, props);
-        String topicName = getTopicName(tid, tableMetadata);
-        if (tableMetadata.appendFiles.isPresent()) {
-          tableMetadata.appendFiles.get().commit();
-          try (Timer.Context context = new Timer().time()) {
-            sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
-            log.info("Sending audit counts for {} took {} ms", topicName, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
-          }
-          if (tableMetadata.completenessEnabled) {
-            updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
-                tableMetadata.totalCountCompletenessEnabled);
-          }
-        }
-        if (tableMetadata.deleteFiles.isPresent()) {
-          tableMetadata.deleteFiles.get().commit();
-        }
-        // Check and update completion watermark when there are no files to be 
registered, typically for quiet topics
-        // The logic is to check the window [currentHour-1,currentHour] and 
update the watermark if there are no audit counts
-        if(!tableMetadata.appendFiles.isPresent() && 
!tableMetadata.deleteFiles.isPresent()
-            && tableMetadata.completenessEnabled) {
-          updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
-              tableMetadata.totalCountCompletenessEnabled);
-        }
+      if (!tableMetadata.transaction.isPresent()) {
+        log.info("There's no transaction initiated for the table {}", tid);
+        return;
+      }
 
-        //Set high waterMark
-        Long highWatermark = tableCurrentWatermarkMap.get(tid);
-        props.put(String.format(GMCE_HIGH_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)), highWatermark.toString());
-        //Set low waterMark
-        props.put(String.format(GMCE_LOW_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)),
-            tableMetadata.lowWatermark.get().toString());
-        //Set whether to delete metadata files after commit
-        if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY, 
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
-          props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
Boolean.toString(
-              
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
-          props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
Integer.toString(
-              conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
-        }
-        //Update schema(commit)
-        updateSchema(tableMetadata, props, topicName);
-        //Update properties
-        UpdateProperties updateProperties = transaction.updateProperties();
-        props.forEach(updateProperties::set);
-        updateProperties.commit();
-        try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, 
tableName);
-            Timer.Context context = new Timer().time()) {
-          transaction.commitTransaction();
-          log.info("Committing transaction for table {} took {} ms", tid, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+      Transaction transaction = tableMetadata.transaction.get();
+      Map<String, String> props = tableMetadata.newProperties.or(
+          
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+      //Set data offset range
+      setDatasetOffsetRange(tableMetadata, props);
+      String topicName = getTopicName(tid, tableMetadata);
+      if (tableMetadata.appendFiles.isPresent()) {
+        tableMetadata.appendFiles.get().commit();
+        if (tableMetadata.completenessEnabled) {
+          updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
+              tableMetadata.totalCountCompletenessEnabled);
         }
+      }
 
-        // Emit GTE for snapshot commits
-        Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
-        Map<String, String> currentProps = 
tableMetadata.table.get().properties();
-        try (Timer.Context context = new Timer().time()) {
-          submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, 
tableName, currentProps, highWatermark);
-          log.info("Sending snapshot commit event for {} took {} ms", 
topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
-        }
+      if (tableMetadata.deleteFiles.isPresent()) {
+        tableMetadata.deleteFiles.get().commit();
+      }
+      // Check and update completion watermark when there are no files to be 
registered, typically for quiet topics
+      // The logic is to check the window [currentHour-1,currentHour] and 
update the watermark if there are no audit counts
+      if(!tableMetadata.appendFiles.isPresent() && 
!tableMetadata.deleteFiles.isPresent()
+          && tableMetadata.completenessEnabled) {
+        updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
+            tableMetadata.totalCountCompletenessEnabled);
+      }
 
-        //Reset the table metadata for next accumulation period
-        tableMetadata.reset(currentProps, highWatermark);
-        log.info(String.format("Finish commit of new snapshot %s for table 
%s", snapshot.snapshotId(), tid));
-      } else {
-        log.info("There's no transaction initiated for the table {}", tid);
+      //Set high waterMark
+      Long highWatermark = tableCurrentWatermarkMap.get(tid);
+      props.put(String.format(GMCE_HIGH_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)), highWatermark.toString());
+      //Set low waterMark
+      props.put(String.format(GMCE_LOW_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)),
+          tableMetadata.lowWatermark.get().toString());
+      //Set whether to delete metadata files after commit
+      if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY, 
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
+        props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
Boolean.toString(
+            
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
+        props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
Integer.toString(
+            conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
+      }
+      //Update schema(commit)
+      updateSchema(tableMetadata, props, topicName);
+      //Update properties
+      UpdateProperties updateProperties = transaction.updateProperties();
+      props.forEach(updateProperties::set);
+      updateProperties.commit();
+      try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, 
tableName);
+          Timer.Context context = new Timer().time()) {
+        transaction.commitTransaction();
+        log.info("Committing transaction for table {} took {} ms", tid, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+        transactionCommitted = true;
       }
+
+      postCommit(tableMetadata, dbName, tableName, topicName, highWatermark);
+      //Reset the table metadata for next accumulation period
+      Map<String, String> currentProps = 
tableMetadata.table.get().properties();
+      Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
+      tableMetadata.reset(currentProps, highWatermark);
+      log.info(String.format("Finish commit of new snapshot %s for table %s", 
snapshot.snapshotId(), tid));
     } catch (RuntimeException e) {
-      throw new IOException(String.format("Fail to flush table %s %s", dbName, 
tableName), e);
+      throw new IOException(String.format("Failed to flush table %s %s. 
transactionCommitted=%s",
+          dbName, tableName, transactionCommitted), e);
     } catch (Exception e) {
-      throw new IOException(String.format("Fail to flush table %s %s", dbName, 
tableName), e);
+      throw new IOException(String.format("Failed to flush table %s %s. 
transactionCommitted=%s",
+          dbName, tableName, transactionCommitted), e);
     } finally {
       writeLock.unlock();
     }
   }
 
+  /**
+   * PostCommit operation that executes after the transaction is committed to 
the Iceberg table. Operations in this
+   * method are considered non-critical to the transaction and will not cause 
the transaction to fail if they fail,
+   * but should ideally still be executed for observability.
+   *
+   * One example of this is observability events / metrics like {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}.
+   * The default behavior is to emit a GTE for the commit event and a kafka 
audit event
+   *
+   * @param tableMetadata
+   * @param dbName
+   * @param tableName
+   * @param topicName
+   * @param highWatermark
+   * @throws IOException
+   */
+  protected void postCommit(
+      TableMetadata tableMetadata,
+      String dbName,
+      String tableName,
+      String topicName,
+      long highWatermark) throws IOException {
+    // Emit GTE for snapshot commits
+    Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
+    Map<String, String> currentProps = tableMetadata.table.get().properties();
+
+    // Sending the audit count before the snapshot commit event because 
downstream users are more likely
+    // to consume this audit count API for determining completion since it is 
agnostic to the system (e.g. Kafka, Brooklin)
+    // The snapshot commit event is more for internal monitoring.
+    try (Timer.Context context = new Timer().time()) {
+      sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
+      log.info("Sending audit counts for {} took {} ms", topicName, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+    }
+
+    try (Timer.Context context = new Timer().time()) {
+      submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, 
currentProps, highWatermark);
+      log.info("Sending snapshot commit event for {} took {} ms", topicName, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+    }
+  }
+
   private CompletenessWatermarkUpdater getWatermarkUpdater(String topicName, 
TableMetadata tableMetadata,
       Map<String, String> propsToUpdate) {
     return new CompletenessWatermarkUpdater(topicName, 
this.auditCheckGranularity, this.timeZone,
@@ -946,7 +982,7 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
     this.tableMetadataMap.remove(TableIdentifier.of(dbName, tableName));
   }
 
-  private void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata 
tableMetadata, String dbName,
+  protected void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata 
tableMetadata, String dbName,
       String tableName, Map<String, String> props, Long highWaterMark) {
     GobblinEventBuilder gobblinTrackingEvent =
         new GobblinEventBuilder(MetadataWriterKeys.ICEBERG_COMMIT_EVENT_NAME);
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
index 6feb8adae..e0e2f4f4f 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriterTest.java
@@ -22,8 +22,16 @@ import java.io.IOException;
 import java.time.ZoneId;
 import java.time.ZonedDateTime;
 import java.time.temporal.ChronoUnit;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.file.DataFileWriter;
@@ -39,6 +47,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.iceberg.FindFiles;
+import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.expressions.Expressions;
@@ -405,7 +414,7 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
 
   @Test(dependsOnMethods={"testChangeProperty"}, 
groups={"icebergMetadataWriterTest"})
   public void testWriteAddFileGMCECompleteness() throws IOException {
-    // Creating a copy of gmce with static type in GenericRecord to work with 
writeEnvelop method
+    // Creating a copy of gmce with static type in GenericRecord to work with 
writeEnvelope method
     // without risking running into type cast runtime error.
     gmce.setOperationType(OperationType.add_files);
     File hourlyFile = new File(tmpDir, 
"testDB/testTopicCompleteness/hourly/2021/09/16/10/data.avro");
@@ -546,6 +555,57 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
     
Assert.assertEquals(table.properties().get(TOTAL_COUNT_COMPLETION_WATERMARK_KEY),
 String.valueOf(expectedWatermark));
   }
 
+  @Test(dependsOnMethods={"testChangePropertyGMCECompleteness"}, 
groups={"icebergMetadataWriterTest"})
+  public void testKafkaAuditAndGTEEmittedAfterIcebergCommitDuringFlush() 
throws IOException {
+    State state = getState();
+    state.setProp(GobblinMCEWriter.GMCE_METADATA_WRITER_CLASSES, 
SpyIcebergMetadataWriter.class.getName());
+    GobblinMCEWriter gobblinMCEWriterWithSpy = new GobblinMCEWriter(new 
GobblinMCEWriterBuilder(), state);
+    // Set fault tolerant dataset number to be 1 so watermark is updated
+    gobblinMCEWriterWithSpy.setMaxErrorDataset(1);
+
+    Assert.assertEquals(gobblinMCEWriterWithSpy.metadataWriters.size(), 1);
+    
Assert.assertEquals(gobblinMCEWriterWithSpy.metadataWriters.get(0).getClass().getName(),
 SpyIcebergMetadataWriter.class.getName());
+    SpyIcebergMetadataWriter spyIcebergMetadataWriter =
+        (SpyIcebergMetadataWriter) 
gobblinMCEWriterWithSpy.metadataWriters.get(0);
+
+
+    // For quiet topics, watermark should always be beginning of current hour
+    File hourlyFile = new File(tmpDir, 
"testDB/testTopic/hourly/2021/09/16/11/failAfterCommit.avro");
+    Files.createParentDirs(hourlyFile);
+    writeRecord(hourlyFile);
+    Assert.assertTrue(hourlyFile.exists());
+    gmce.setOldFilePrefixes(null);
+    gmce.setNewFiles(Lists.newArrayList(DataFile.newBuilder()
+        .setFilePath(hourlyFile.toString())
+        .setFileFormat("avro")
+        .setFileMetrics(DataMetrics.newBuilder().setRecordCount(10L).build())
+        .build()));
+    gmce.setOperationType(OperationType.add_files);
+    gmce.setTopicPartitionOffsetsRange(ImmutableMap.<String, 
String>builder().put("testTopic-1", "4000-4001").build());
+    gmce.setAllowedMetadataWriters(new ArrayList<>());
+    GenericRecord genericGmce = GenericData.get().deepCopy(gmce.getSchema(), 
gmce);
+    gobblinMCEWriterWithSpy.writeEnvelope(new RecordEnvelope<>(genericGmce,
+        new KafkaStreamingExtractor.KafkaWatermark(
+            new 
KafkaPartition.Builder().withTopicName("GobblinMetadataChangeEvent_test").withId(1).build(),
+            new LongWatermark(70L))));
+
+    gobblinMCEWriterWithSpy.flush();
+
+    Table table = 
catalog.loadTable(catalog.listTables(Namespace.of(dbName)).get(0));
+    Assert.assertEquals(table.properties().get(TOPIC_NAME_KEY), "testTopic");
+
+    // get the file that patches the path of the file that failed to be added
+    Iterator<org.apache.iceberg.DataFile> result = 
FindFiles.in(table).withMetadataMatching(Expressions.startsWith("file_path", 
hourlyFile.getAbsolutePath())).collect().iterator();
+    Assert.assertTrue(result.hasNext());
+    Assert.assertEquals(result.next().path(), hourlyFile.getAbsolutePath());
+    Assert.assertEquals(table.properties().get("offset.range.testTopic-1"), 
"0-4001");
+
+    // The audit count
+    
Assert.assertEquals(spyIcebergMetadataWriter.methodsCalledCounter.get("postCommit").get(),
 1);
+    
Assert.assertEquals(spyIcebergMetadataWriter.methodsCalledCounter.get("sendAuditCounts"),
 null);
+    
Assert.assertEquals(spyIcebergMetadataWriter.methodsCalledCounter.get("submitSnapshotCommitEvent"),
 null);
+  }
+
   private String writeRecord(File file) throws IOException {
     GenericData.Record record = new GenericData.Record(avroDataSchema);
     record.put("id", 1L);
@@ -621,5 +681,40 @@ public class IcebergMetadataWriterTest extends 
HiveMetastoreTest {
       super(state, client);
     }
   }
+
+  /**
+   * A spy class for IcebergMetadataWriter to track the methods called and 
intentionally
+   * invoke failure after the iceberg transaction is committed
+   */
+  public static class SpyIcebergMetadataWriter extends IcebergMetadataWriter {
+    public Map<String, AtomicInteger> methodsCalledCounter = new HashMap<>();
+
+    public SpyIcebergMetadataWriter(State state)
+        throws IOException {
+      super(state);
+    }
+
+    protected void postCommit(TableMetadata tableMetadata, String dbName, 
String tableName, String topicName,
+        long highWatermark) {
+      String methodName = new Object() 
{}.getClass().getEnclosingMethod().getName();
+      methodsCalledCounter.putIfAbsent(methodName, new AtomicInteger(0));
+      methodsCalledCounter.get(methodName).incrementAndGet();
+      throw new RuntimeException("Intentionally aborting postcommit for 
testing");
+    }
+
+    @Override
+    public void sendAuditCounts(String topicName, Collection<String> 
serializedAuditCountMaps) {
+      String methodName = new Object() 
{}.getClass().getEnclosingMethod().getName();
+      methodsCalledCounter.putIfAbsent(methodName, new AtomicInteger(0));
+      methodsCalledCounter.get(methodName).incrementAndGet();
+    }
+
+    protected void submitSnapshotCommitEvent(Snapshot snapshot, TableMetadata 
tableMetadata, String dbName,
+        String tableName, Map<String, String> props, Long highWaterMark) {
+      String methodName = new Object() 
{}.getClass().getEnclosingMethod().getName();
+      methodsCalledCounter.putIfAbsent(methodName, new AtomicInteger(0));
+      methodsCalledCounter.get(methodName).incrementAndGet();
+    }
+  }
 }
 

Reply via email to