ZihanLi58 commented on code in PR #3833:
URL: https://github.com/apache/gobblin/pull/3833#discussion_r1408467756
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -834,86 +834,115 @@ protected String getTopicName(TableIdentifier tid,
TableMetadata tableMetadata)
public void flush(String dbName, String tableName) throws IOException {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
+ boolean transactionCommitted = false;
try {
TableIdentifier tid = TableIdentifier.of(dbName, tableName);
TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new
TableMetadata(this.conf));
- if (tableMetadata.transaction.isPresent()) {
- Transaction transaction = tableMetadata.transaction.get();
- Map<String, String> props = tableMetadata.newProperties.or(
-
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
- //Set data offset range
- setDatasetOffsetRange(tableMetadata, props);
- String topicName = getTopicName(tid, tableMetadata);
- if (tableMetadata.appendFiles.isPresent()) {
- tableMetadata.appendFiles.get().commit();
- try (Timer.Context context = new Timer().time()) {
- sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
- log.info("Sending audit counts for {} took {} ms", topicName,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
- }
- if (tableMetadata.completenessEnabled) {
- updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
- tableMetadata.totalCountCompletenessEnabled);
- }
- }
- if (tableMetadata.deleteFiles.isPresent()) {
- tableMetadata.deleteFiles.get().commit();
- }
- // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
- // The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
- if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
- && tableMetadata.completenessEnabled) {
- updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
- tableMetadata.totalCountCompletenessEnabled);
- }
+ if (!tableMetadata.transaction.isPresent()) {
+ log.info("There's no transaction initiated for the table {}", tid);
+ return;
+ }
- //Set high waterMark
- Long highWatermark = tableCurrentWatermarkMap.get(tid);
- props.put(String.format(GMCE_HIGH_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)), highWatermark.toString());
- //Set low waterMark
- props.put(String.format(GMCE_LOW_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)),
- tableMetadata.lowWatermark.get().toString());
- //Set whether to delete metadata files after commit
- if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY,
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
- props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
Boolean.toString(
-
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
- props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
Integer.toString(
- conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
- }
- //Update schema(commit)
- updateSchema(tableMetadata, props, topicName);
- //Update properties
- UpdateProperties updateProperties = transaction.updateProperties();
- props.forEach(updateProperties::set);
- updateProperties.commit();
- try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName);
- Timer.Context context = new Timer().time()) {
- transaction.commitTransaction();
- log.info("Committing transaction for table {} took {} ms", tid,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ Transaction transaction = tableMetadata.transaction.get();
+ Map<String, String> props = tableMetadata.newProperties.or(
+
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+ //Set data offset range
+ setDatasetOffsetRange(tableMetadata, props);
+ String topicName = getTopicName(tid, tableMetadata);
+ if (tableMetadata.appendFiles.isPresent()) {
+ tableMetadata.appendFiles.get().commit();
+ if (tableMetadata.completenessEnabled) {
+ updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
+ tableMetadata.totalCountCompletenessEnabled);
}
+ }
- // Emit GTE for snapshot commits
- Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
- Map<String, String> currentProps =
tableMetadata.table.get().properties();
- try (Timer.Context context = new Timer().time()) {
- submitSnapshotCommitEvent(snapshot, tableMetadata, dbName,
tableName, currentProps, highWatermark);
- log.info("Sending snapshot commit event for {} took {} ms",
topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
- }
+ if (tableMetadata.deleteFiles.isPresent()) {
+ tableMetadata.deleteFiles.get().commit();
+ }
+ // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
+ // The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
+ if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
+ && tableMetadata.completenessEnabled) {
+ updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
+ tableMetadata.totalCountCompletenessEnabled);
+ }
- //Reset the table metadata for next accumulation period
- tableMetadata.reset(currentProps, highWatermark);
- log.info(String.format("Finish commit of new snapshot %s for table
%s", snapshot.snapshotId(), tid));
- } else {
- log.info("There's no transaction initiated for the table {}", tid);
+ //Set high waterMark
+ Long highWatermark = tableCurrentWatermarkMap.get(tid);
+ props.put(String.format(GMCE_HIGH_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)), highWatermark.toString());
+ //Set low waterMark
+ props.put(String.format(GMCE_LOW_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)),
+ tableMetadata.lowWatermark.get().toString());
+ //Set whether to delete metadata files after commit
+ if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY,
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
+ props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
Boolean.toString(
+
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
+ props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
Integer.toString(
+ conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
+ }
+ //Update schema(commit)
+ updateSchema(tableMetadata, props, topicName);
+ //Update properties
+ UpdateProperties updateProperties = transaction.updateProperties();
+ props.forEach(updateProperties::set);
+ updateProperties.commit();
+ try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName);
+ Timer.Context context = new Timer().time()) {
+ transaction.commitTransaction();
+ log.info("Committing transaction for table {} took {} ms", tid,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ transactionCommitted = true;
}
- } catch (RuntimeException e) {
- throw new IOException(String.format("Fail to flush table %s %s", dbName,
tableName), e);
+
+ postCommit(tableMetadata, dbName, tableName, topicName, highWatermark);
+ //Reset the table metadata for next accumulation period
+ Map<String, String> currentProps =
tableMetadata.table.get().properties();
+ Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
+ tableMetadata.reset(currentProps, highWatermark);
+ log.info(String.format("Finish commit of new snapshot %s for table %s",
snapshot.snapshotId(), tid));
} catch (Exception e) {
- throw new IOException(String.format("Fail to flush table %s %s", dbName,
tableName), e);
+ throw new IOException(String.format("Failed to flush table %s %s.
transactionCommitted=%s",
+ dbName, tableName, transactionCommitted), e);
} finally {
writeLock.unlock();
}
}
+ /**
+ * PostCommit operation that executes after the transaction is committed to
the Iceberg table. Operations in this
+ * method are considered non-critical to the transaction and will not cause
the transaction to fail if they fail,
+ * but should ideally still be executed for observability.
+ *
+ * One example of this is observability events / metrics like {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}.
+ * The default behavior is to emit a GTE for the commit event and a kafka
audit event
+ *
+ * @param tableMetadata
+ * @param dbName
+ * @param tableName
+ * @param topicName
+ * @param highWatermark
+ * @throws IOException
+ */
+ protected void postCommit(
+ TableMetadata tableMetadata,
+ String dbName,
+ String tableName,
+ String topicName,
+ long highWatermark) throws IOException {
+ // Emit GTE for snapshot commits
+ Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
+ Map<String, String> currentProps = tableMetadata.table.get().properties();
+ try (Timer.Context context = new Timer().time()) {
+ submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName,
currentProps, highWatermark);
+ log.info("Sending snapshot commit event for {} took {} ms", topicName,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ }
+
+ try (Timer.Context context = new Timer().time()) {
+ sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
Review Comment:
+1, and in addition to that, missing one GTE won't cause alert in our
monitoring system, but missing sending audit count will cause us to have less
count permanently
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]