homatthew commented on code in PR #3833:
URL: https://github.com/apache/gobblin/pull/3833#discussion_r1408141406
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -834,86 +834,115 @@ protected String getTopicName(TableIdentifier tid,
TableMetadata tableMetadata)
public void flush(String dbName, String tableName) throws IOException {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
+ boolean transactionCommitted = false;
try {
TableIdentifier tid = TableIdentifier.of(dbName, tableName);
TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new
TableMetadata(this.conf));
- if (tableMetadata.transaction.isPresent()) {
- Transaction transaction = tableMetadata.transaction.get();
- Map<String, String> props = tableMetadata.newProperties.or(
-
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
- //Set data offset range
- setDatasetOffsetRange(tableMetadata, props);
- String topicName = getTopicName(tid, tableMetadata);
- if (tableMetadata.appendFiles.isPresent()) {
- tableMetadata.appendFiles.get().commit();
- try (Timer.Context context = new Timer().time()) {
- sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
- log.info("Sending audit counts for {} took {} ms", topicName,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
- }
- if (tableMetadata.completenessEnabled) {
- updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
- tableMetadata.totalCountCompletenessEnabled);
- }
- }
- if (tableMetadata.deleteFiles.isPresent()) {
- tableMetadata.deleteFiles.get().commit();
- }
- // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
- // The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
- if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
- && tableMetadata.completenessEnabled) {
- updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
- tableMetadata.totalCountCompletenessEnabled);
- }
+ if (!tableMetadata.transaction.isPresent()) {
+ log.info("There's no transaction initiated for the table {}", tid);
+ return;
+ }
- //Set high waterMark
- Long highWatermark = tableCurrentWatermarkMap.get(tid);
- props.put(String.format(GMCE_HIGH_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)), highWatermark.toString());
- //Set low waterMark
- props.put(String.format(GMCE_LOW_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)),
- tableMetadata.lowWatermark.get().toString());
- //Set whether to delete metadata files after commit
- if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY,
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
- props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
Boolean.toString(
-
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
- props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
Integer.toString(
- conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
- }
- //Update schema(commit)
- updateSchema(tableMetadata, props, topicName);
- //Update properties
- UpdateProperties updateProperties = transaction.updateProperties();
- props.forEach(updateProperties::set);
- updateProperties.commit();
- try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName);
- Timer.Context context = new Timer().time()) {
- transaction.commitTransaction();
- log.info("Committing transaction for table {} took {} ms", tid,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ Transaction transaction = tableMetadata.transaction.get();
+ Map<String, String> props = tableMetadata.newProperties.or(
+
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+ //Set data offset range
+ setDatasetOffsetRange(tableMetadata, props);
+ String topicName = getTopicName(tid, tableMetadata);
+ if (tableMetadata.appendFiles.isPresent()) {
+ tableMetadata.appendFiles.get().commit();
+ if (tableMetadata.completenessEnabled) {
+ updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
+ tableMetadata.totalCountCompletenessEnabled);
}
+ }
- // Emit GTE for snapshot commits
- Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
- Map<String, String> currentProps =
tableMetadata.table.get().properties();
- try (Timer.Context context = new Timer().time()) {
- submitSnapshotCommitEvent(snapshot, tableMetadata, dbName,
tableName, currentProps, highWatermark);
- log.info("Sending snapshot commit event for {} took {} ms",
topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
- }
+ if (tableMetadata.deleteFiles.isPresent()) {
+ tableMetadata.deleteFiles.get().commit();
+ }
+ // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
+ // The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
+ if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
+ && tableMetadata.completenessEnabled) {
+ updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
+ tableMetadata.totalCountCompletenessEnabled);
+ }
- //Reset the table metadata for next accumulation period
- tableMetadata.reset(currentProps, highWatermark);
- log.info(String.format("Finish commit of new snapshot %s for table
%s", snapshot.snapshotId(), tid));
- } else {
- log.info("There's no transaction initiated for the table {}", tid);
+ //Set high waterMark
+ Long highWatermark = tableCurrentWatermarkMap.get(tid);
+ props.put(String.format(GMCE_HIGH_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)), highWatermark.toString());
+ //Set low waterMark
+ props.put(String.format(GMCE_LOW_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)),
+ tableMetadata.lowWatermark.get().toString());
+ //Set whether to delete metadata files after commit
+ if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY,
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
+ props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
Boolean.toString(
+
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
+ props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
Integer.toString(
+ conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
+ }
+ //Update schema(commit)
+ updateSchema(tableMetadata, props, topicName);
+ //Update properties
+ UpdateProperties updateProperties = transaction.updateProperties();
+ props.forEach(updateProperties::set);
+ updateProperties.commit();
+ try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName);
+ Timer.Context context = new Timer().time()) {
+ transaction.commitTransaction();
+ log.info("Committing transaction for table {} took {} ms", tid,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ transactionCommitted = true;
}
- } catch (RuntimeException e) {
- throw new IOException(String.format("Fail to flush table %s %s", dbName,
tableName), e);
+
+ postCommit(tableMetadata, dbName, tableName, topicName, highWatermark);
+ //Reset the table metadata for next accumulation period
+ Map<String, String> currentProps =
tableMetadata.table.get().properties();
+ Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
+ tableMetadata.reset(currentProps, highWatermark);
+ log.info(String.format("Finish commit of new snapshot %s for table %s",
snapshot.snapshotId(), tid));
} catch (Exception e) {
- throw new IOException(String.format("Fail to flush table %s %s", dbName,
tableName), e);
+ throw new IOException(String.format("Failed to flush table %s %s.
transactionCommitted=%s",
+ dbName, tableName, transactionCommitted), e);
} finally {
writeLock.unlock();
}
}
+ /**
+ * PostCommit operation that executes after the transaction is committed to
the Iceberg table. Operations in this
+ * method are considered non-critical to the transaction and will not cause
the transaction to fail if they fail,
+ * but should ideally still be executed for observability.
+ *
+ * One example of this is observability events / metrics like {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}.
+ * The default behavior is to emit a GTE for the commit event and a kafka
audit event
+ *
+ * @param tableMetadata
+ * @param dbName
+ * @param tableName
+ * @param topicName
+ * @param highWatermark
+ * @throws IOException
+ */
+ protected void postCommit(
+ TableMetadata tableMetadata,
+ String dbName,
+ String tableName,
+ String topicName,
+ long highWatermark) throws IOException {
+ // Emit GTE for snapshot commits
+ Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
+ Map<String, String> currentProps = tableMetadata.table.get().properties();
+ try (Timer.Context context = new Timer().time()) {
+ submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName,
currentProps, highWatermark);
+ log.info("Sending snapshot commit event for {} took {} ms", topicName,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ }
+
+ try (Timer.Context context = new Timer().time()) {
+ sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
Review Comment:
There are 2 ways to think about it:
1. Kafka Audit is easier to check, and downstream users are more likely to
use this number, so we should prioritize this step
2. Gobblin commit GTE is what we use for Gobblin monitoring and so we should
prioritize our own monitoring.
Practically speaking, the most common use case is the first (as you
suggest), so I think it's a good idea to emit the audit count first
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]