[ 
https://issues.apache.org/jira/browse/GOBBLIN-1960?focusedWorklogId=891673&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-891673
 ]

ASF GitHub Bot logged work on GOBBLIN-1960:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 22/Nov/23 00:49
            Start Date: 22/Nov/23 00:49
    Worklog Time Spent: 10m 
      Work Description: ZihanLi58 commented on code in PR #3833:
URL: https://github.com/apache/gobblin/pull/3833#discussion_r1401376524


##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -834,86 +834,115 @@ protected String getTopicName(TableIdentifier tid, 
TableMetadata tableMetadata)
   public void flush(String dbName, String tableName) throws IOException {
     Lock writeLock = readWriteLock.writeLock();
     writeLock.lock();
+    boolean transactionCommitted = false;
     try {
       TableIdentifier tid = TableIdentifier.of(dbName, tableName);
       TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new 
TableMetadata(this.conf));
-      if (tableMetadata.transaction.isPresent()) {
-        Transaction transaction = tableMetadata.transaction.get();
-        Map<String, String> props = tableMetadata.newProperties.or(
-            
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
-        //Set data offset range
-        setDatasetOffsetRange(tableMetadata, props);
-        String topicName = getTopicName(tid, tableMetadata);
-        if (tableMetadata.appendFiles.isPresent()) {
-          tableMetadata.appendFiles.get().commit();
-          try (Timer.Context context = new Timer().time()) {
-            sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
-            log.info("Sending audit counts for {} took {} ms", topicName, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
-          }
-          if (tableMetadata.completenessEnabled) {
-            updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
-                tableMetadata.totalCountCompletenessEnabled);
-          }
-        }
-        if (tableMetadata.deleteFiles.isPresent()) {
-          tableMetadata.deleteFiles.get().commit();
-        }
-        // Check and update completion watermark when there are no files to be 
registered, typically for quiet topics
-        // The logic is to check the window [currentHour-1,currentHour] and 
update the watermark if there are no audit counts
-        if(!tableMetadata.appendFiles.isPresent() && 
!tableMetadata.deleteFiles.isPresent()
-            && tableMetadata.completenessEnabled) {
-          updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
-              tableMetadata.totalCountCompletenessEnabled);
-        }
+      if (!tableMetadata.transaction.isPresent()) {
+        log.info("There's no transaction initiated for the table {}", tid);
+        return;
+      }
 
-        //Set high waterMark
-        Long highWatermark = tableCurrentWatermarkMap.get(tid);
-        props.put(String.format(GMCE_HIGH_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)), highWatermark.toString());
-        //Set low waterMark
-        props.put(String.format(GMCE_LOW_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)),
-            tableMetadata.lowWatermark.get().toString());
-        //Set whether to delete metadata files after commit
-        if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY, 
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
-          props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
Boolean.toString(
-              
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
-          props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
Integer.toString(
-              conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
-        }
-        //Update schema(commit)
-        updateSchema(tableMetadata, props, topicName);
-        //Update properties
-        UpdateProperties updateProperties = transaction.updateProperties();
-        props.forEach(updateProperties::set);
-        updateProperties.commit();
-        try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, 
tableName);
-            Timer.Context context = new Timer().time()) {
-          transaction.commitTransaction();
-          log.info("Committing transaction for table {} took {} ms", tid, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+      Transaction transaction = tableMetadata.transaction.get();
+      Map<String, String> props = tableMetadata.newProperties.or(
+          
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+      //Set data offset range
+      setDatasetOffsetRange(tableMetadata, props);
+      String topicName = getTopicName(tid, tableMetadata);
+      if (tableMetadata.appendFiles.isPresent()) {
+        tableMetadata.appendFiles.get().commit();
+        if (tableMetadata.completenessEnabled) {
+          updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
+              tableMetadata.totalCountCompletenessEnabled);
         }
+      }
 
-        // Emit GTE for snapshot commits
-        Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
-        Map<String, String> currentProps = 
tableMetadata.table.get().properties();
-        try (Timer.Context context = new Timer().time()) {
-          submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, 
tableName, currentProps, highWatermark);
-          log.info("Sending snapshot commit event for {} took {} ms", 
topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
-        }
+      if (tableMetadata.deleteFiles.isPresent()) {
+        tableMetadata.deleteFiles.get().commit();
+      }
+      // Check and update completion watermark when there are no files to be 
registered, typically for quiet topics
+      // The logic is to check the window [currentHour-1,currentHour] and 
update the watermark if there are no audit counts
+      if(!tableMetadata.appendFiles.isPresent() && 
!tableMetadata.deleteFiles.isPresent()
+          && tableMetadata.completenessEnabled) {
+        updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
+            tableMetadata.totalCountCompletenessEnabled);
+      }
 
-        //Reset the table metadata for next accumulation period
-        tableMetadata.reset(currentProps, highWatermark);
-        log.info(String.format("Finish commit of new snapshot %s for table 
%s", snapshot.snapshotId(), tid));
-      } else {
-        log.info("There's no transaction initiated for the table {}", tid);
+      //Set high waterMark
+      Long highWatermark = tableCurrentWatermarkMap.get(tid);
+      props.put(String.format(GMCE_HIGH_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)), highWatermark.toString());
+      //Set low waterMark
+      props.put(String.format(GMCE_LOW_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)),
+          tableMetadata.lowWatermark.get().toString());
+      //Set whether to delete metadata files after commit
+      if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY, 
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
+        props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
Boolean.toString(
+            
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
+        props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
Integer.toString(
+            conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
+      }
+      //Update schema(commit)
+      updateSchema(tableMetadata, props, topicName);
+      //Update properties
+      UpdateProperties updateProperties = transaction.updateProperties();
+      props.forEach(updateProperties::set);
+      updateProperties.commit();
+      try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, 
tableName);
+          Timer.Context context = new Timer().time()) {
+        transaction.commitTransaction();
+        log.info("Committing transaction for table {} took {} ms", tid, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+        transactionCommitted = true;
       }
-    } catch (RuntimeException e) {
-      throw new IOException(String.format("Fail to flush table %s %s", dbName, 
tableName), e);
+
+      postCommit(tableMetadata, dbName, tableName, topicName, highWatermark);
+      //Reset the table metadata for next accumulation period
+      Map<String, String> currentProps = 
tableMetadata.table.get().properties();

Review Comment:
   why no put these line in postCommit ?



##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -834,86 +834,115 @@ protected String getTopicName(TableIdentifier tid, 
TableMetadata tableMetadata)
   public void flush(String dbName, String tableName) throws IOException {
     Lock writeLock = readWriteLock.writeLock();
     writeLock.lock();
+    boolean transactionCommitted = false;
     try {
       TableIdentifier tid = TableIdentifier.of(dbName, tableName);
       TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new 
TableMetadata(this.conf));
-      if (tableMetadata.transaction.isPresent()) {
-        Transaction transaction = tableMetadata.transaction.get();
-        Map<String, String> props = tableMetadata.newProperties.or(
-            
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
-        //Set data offset range
-        setDatasetOffsetRange(tableMetadata, props);
-        String topicName = getTopicName(tid, tableMetadata);
-        if (tableMetadata.appendFiles.isPresent()) {
-          tableMetadata.appendFiles.get().commit();
-          try (Timer.Context context = new Timer().time()) {
-            sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
-            log.info("Sending audit counts for {} took {} ms", topicName, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
-          }
-          if (tableMetadata.completenessEnabled) {
-            updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
-                tableMetadata.totalCountCompletenessEnabled);
-          }
-        }
-        if (tableMetadata.deleteFiles.isPresent()) {
-          tableMetadata.deleteFiles.get().commit();
-        }
-        // Check and update completion watermark when there are no files to be 
registered, typically for quiet topics
-        // The logic is to check the window [currentHour-1,currentHour] and 
update the watermark if there are no audit counts
-        if(!tableMetadata.appendFiles.isPresent() && 
!tableMetadata.deleteFiles.isPresent()
-            && tableMetadata.completenessEnabled) {
-          updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
-              tableMetadata.totalCountCompletenessEnabled);
-        }
+      if (!tableMetadata.transaction.isPresent()) {
+        log.info("There's no transaction initiated for the table {}", tid);
+        return;
+      }
 
-        //Set high waterMark
-        Long highWatermark = tableCurrentWatermarkMap.get(tid);
-        props.put(String.format(GMCE_HIGH_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)), highWatermark.toString());
-        //Set low waterMark
-        props.put(String.format(GMCE_LOW_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)),
-            tableMetadata.lowWatermark.get().toString());
-        //Set whether to delete metadata files after commit
-        if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY, 
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
-          props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
Boolean.toString(
-              
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
-          props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
Integer.toString(
-              conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
-        }
-        //Update schema(commit)
-        updateSchema(tableMetadata, props, topicName);
-        //Update properties
-        UpdateProperties updateProperties = transaction.updateProperties();
-        props.forEach(updateProperties::set);
-        updateProperties.commit();
-        try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, 
tableName);
-            Timer.Context context = new Timer().time()) {
-          transaction.commitTransaction();
-          log.info("Committing transaction for table {} took {} ms", tid, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+      Transaction transaction = tableMetadata.transaction.get();
+      Map<String, String> props = tableMetadata.newProperties.or(
+          
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+      //Set data offset range
+      setDatasetOffsetRange(tableMetadata, props);
+      String topicName = getTopicName(tid, tableMetadata);
+      if (tableMetadata.appendFiles.isPresent()) {
+        tableMetadata.appendFiles.get().commit();
+        if (tableMetadata.completenessEnabled) {
+          updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
+              tableMetadata.totalCountCompletenessEnabled);
         }
+      }
 
-        // Emit GTE for snapshot commits
-        Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
-        Map<String, String> currentProps = 
tableMetadata.table.get().properties();
-        try (Timer.Context context = new Timer().time()) {
-          submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, 
tableName, currentProps, highWatermark);
-          log.info("Sending snapshot commit event for {} took {} ms", 
topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
-        }
+      if (tableMetadata.deleteFiles.isPresent()) {
+        tableMetadata.deleteFiles.get().commit();
+      }
+      // Check and update completion watermark when there are no files to be 
registered, typically for quiet topics
+      // The logic is to check the window [currentHour-1,currentHour] and 
update the watermark if there are no audit counts
+      if(!tableMetadata.appendFiles.isPresent() && 
!tableMetadata.deleteFiles.isPresent()
+          && tableMetadata.completenessEnabled) {
+        updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
+            tableMetadata.totalCountCompletenessEnabled);
+      }
 
-        //Reset the table metadata for next accumulation period
-        tableMetadata.reset(currentProps, highWatermark);
-        log.info(String.format("Finish commit of new snapshot %s for table 
%s", snapshot.snapshotId(), tid));
-      } else {
-        log.info("There's no transaction initiated for the table {}", tid);
+      //Set high waterMark
+      Long highWatermark = tableCurrentWatermarkMap.get(tid);
+      props.put(String.format(GMCE_HIGH_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)), highWatermark.toString());
+      //Set low waterMark
+      props.put(String.format(GMCE_LOW_WATERMARK_KEY, 
tableTopicPartitionMap.get(tid)),
+          tableMetadata.lowWatermark.get().toString());
+      //Set whether to delete metadata files after commit
+      if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY, 
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
+        props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
Boolean.toString(
+            
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED, 
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
+        props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
Integer.toString(
+            conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX, 
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
+      }
+      //Update schema(commit)
+      updateSchema(tableMetadata, props, topicName);
+      //Update properties
+      UpdateProperties updateProperties = transaction.updateProperties();
+      props.forEach(updateProperties::set);
+      updateProperties.commit();
+      try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName, 
tableName);
+          Timer.Context context = new Timer().time()) {
+        transaction.commitTransaction();
+        log.info("Committing transaction for table {} took {} ms", tid, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+        transactionCommitted = true;
       }
-    } catch (RuntimeException e) {
-      throw new IOException(String.format("Fail to flush table %s %s", dbName, 
tableName), e);
+
+      postCommit(tableMetadata, dbName, tableName, topicName, highWatermark);
+      //Reset the table metadata for next accumulation period
+      Map<String, String> currentProps = 
tableMetadata.table.get().properties();
+      Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
+      tableMetadata.reset(currentProps, highWatermark);
+      log.info(String.format("Finish commit of new snapshot %s for table %s", 
snapshot.snapshotId(), tid));
     } catch (Exception e) {
-      throw new IOException(String.format("Fail to flush table %s %s", dbName, 
tableName), e);
+      throw new IOException(String.format("Failed to flush table %s %s. 
transactionCommitted=%s",
+          dbName, tableName, transactionCommitted), e);
     } finally {
       writeLock.unlock();
     }
   }
 
+  /**
+   * PostCommit operation that executes after the transaction is committed to 
the Iceberg table. Operations in this
+   * method are considered non-critical to the transaction and will not cause 
the transaction to fail if they fail,
+   * but should ideally still be executed for observability.
+   *
+   * One example of this is observability events / metrics like {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}.
+   * The default behavior is to emit a GTE for the commit event and a kafka 
audit event
+   *
+   * @param tableMetadata
+   * @param dbName
+   * @param tableName
+   * @param topicName
+   * @param highWatermark
+   * @throws IOException
+   */
+  protected void postCommit(
+      TableMetadata tableMetadata,
+      String dbName,
+      String tableName,
+      String topicName,
+      long highWatermark) throws IOException {
+    // Emit GTE for snapshot commits
+    Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
+    Map<String, String> currentProps = tableMetadata.table.get().properties();
+    try (Timer.Context context = new Timer().time()) {
+      submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName, 
currentProps, highWatermark);
+      log.info("Sending snapshot commit event for {} took {} ms", topicName, 
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+    }
+
+    try (Timer.Context context = new Timer().time()) {
+      sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);

Review Comment:
   let's try send audit count first to avoid lower counting as much as possible?





Issue Time Tracking
-------------------

            Worklog Id:     (was: 891673)
    Remaining Estimate: 0h
            Time Spent: 10m

> Emit audit count after commit in IcebergMetadataWriter
> ------------------------------------------------------
>
>                 Key: GOBBLIN-1960
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1960
>             Project: Apache Gobblin
>          Issue Type: Improvement
>            Reporter: Matthew Ho
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to