[
https://issues.apache.org/jira/browse/GOBBLIN-1960?focusedWorklogId=891673&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-891673
]
ASF GitHub Bot logged work on GOBBLIN-1960:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 22/Nov/23 00:49
Start Date: 22/Nov/23 00:49
Worklog Time Spent: 10m
Work Description: ZihanLi58 commented on code in PR #3833:
URL: https://github.com/apache/gobblin/pull/3833#discussion_r1401376524
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -834,86 +834,115 @@ protected String getTopicName(TableIdentifier tid,
TableMetadata tableMetadata)
public void flush(String dbName, String tableName) throws IOException {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
+ boolean transactionCommitted = false;
try {
TableIdentifier tid = TableIdentifier.of(dbName, tableName);
TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new
TableMetadata(this.conf));
- if (tableMetadata.transaction.isPresent()) {
- Transaction transaction = tableMetadata.transaction.get();
- Map<String, String> props = tableMetadata.newProperties.or(
-
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
- //Set data offset range
- setDatasetOffsetRange(tableMetadata, props);
- String topicName = getTopicName(tid, tableMetadata);
- if (tableMetadata.appendFiles.isPresent()) {
- tableMetadata.appendFiles.get().commit();
- try (Timer.Context context = new Timer().time()) {
- sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
- log.info("Sending audit counts for {} took {} ms", topicName,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
- }
- if (tableMetadata.completenessEnabled) {
- updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
- tableMetadata.totalCountCompletenessEnabled);
- }
- }
- if (tableMetadata.deleteFiles.isPresent()) {
- tableMetadata.deleteFiles.get().commit();
- }
- // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
- // The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
- if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
- && tableMetadata.completenessEnabled) {
- updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
- tableMetadata.totalCountCompletenessEnabled);
- }
+ if (!tableMetadata.transaction.isPresent()) {
+ log.info("There's no transaction initiated for the table {}", tid);
+ return;
+ }
- //Set high waterMark
- Long highWatermark = tableCurrentWatermarkMap.get(tid);
- props.put(String.format(GMCE_HIGH_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)), highWatermark.toString());
- //Set low waterMark
- props.put(String.format(GMCE_LOW_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)),
- tableMetadata.lowWatermark.get().toString());
- //Set whether to delete metadata files after commit
- if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY,
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
- props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
Boolean.toString(
-
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
- props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
Integer.toString(
- conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
- }
- //Update schema(commit)
- updateSchema(tableMetadata, props, topicName);
- //Update properties
- UpdateProperties updateProperties = transaction.updateProperties();
- props.forEach(updateProperties::set);
- updateProperties.commit();
- try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName);
- Timer.Context context = new Timer().time()) {
- transaction.commitTransaction();
- log.info("Committing transaction for table {} took {} ms", tid,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ Transaction transaction = tableMetadata.transaction.get();
+ Map<String, String> props = tableMetadata.newProperties.or(
+
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+ //Set data offset range
+ setDatasetOffsetRange(tableMetadata, props);
+ String topicName = getTopicName(tid, tableMetadata);
+ if (tableMetadata.appendFiles.isPresent()) {
+ tableMetadata.appendFiles.get().commit();
+ if (tableMetadata.completenessEnabled) {
+ updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
+ tableMetadata.totalCountCompletenessEnabled);
}
+ }
- // Emit GTE for snapshot commits
- Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
- Map<String, String> currentProps =
tableMetadata.table.get().properties();
- try (Timer.Context context = new Timer().time()) {
- submitSnapshotCommitEvent(snapshot, tableMetadata, dbName,
tableName, currentProps, highWatermark);
- log.info("Sending snapshot commit event for {} took {} ms",
topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
- }
+ if (tableMetadata.deleteFiles.isPresent()) {
+ tableMetadata.deleteFiles.get().commit();
+ }
+ // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
+ // The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
+ if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
+ && tableMetadata.completenessEnabled) {
+ updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
+ tableMetadata.totalCountCompletenessEnabled);
+ }
- //Reset the table metadata for next accumulation period
- tableMetadata.reset(currentProps, highWatermark);
- log.info(String.format("Finish commit of new snapshot %s for table
%s", snapshot.snapshotId(), tid));
- } else {
- log.info("There's no transaction initiated for the table {}", tid);
+ //Set high waterMark
+ Long highWatermark = tableCurrentWatermarkMap.get(tid);
+ props.put(String.format(GMCE_HIGH_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)), highWatermark.toString());
+ //Set low waterMark
+ props.put(String.format(GMCE_LOW_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)),
+ tableMetadata.lowWatermark.get().toString());
+ //Set whether to delete metadata files after commit
+ if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY,
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
+ props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
Boolean.toString(
+
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
+ props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
Integer.toString(
+ conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
+ }
+ //Update schema(commit)
+ updateSchema(tableMetadata, props, topicName);
+ //Update properties
+ UpdateProperties updateProperties = transaction.updateProperties();
+ props.forEach(updateProperties::set);
+ updateProperties.commit();
+ try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName);
+ Timer.Context context = new Timer().time()) {
+ transaction.commitTransaction();
+ log.info("Committing transaction for table {} took {} ms", tid,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ transactionCommitted = true;
}
- } catch (RuntimeException e) {
- throw new IOException(String.format("Fail to flush table %s %s", dbName,
tableName), e);
+
+ postCommit(tableMetadata, dbName, tableName, topicName, highWatermark);
+ //Reset the table metadata for next accumulation period
+ Map<String, String> currentProps =
tableMetadata.table.get().properties();
Review Comment:
why no put these line in postCommit ?
##########
gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java:
##########
@@ -834,86 +834,115 @@ protected String getTopicName(TableIdentifier tid,
TableMetadata tableMetadata)
public void flush(String dbName, String tableName) throws IOException {
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
+ boolean transactionCommitted = false;
try {
TableIdentifier tid = TableIdentifier.of(dbName, tableName);
TableMetadata tableMetadata = tableMetadataMap.getOrDefault(tid, new
TableMetadata(this.conf));
- if (tableMetadata.transaction.isPresent()) {
- Transaction transaction = tableMetadata.transaction.get();
- Map<String, String> props = tableMetadata.newProperties.or(
-
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
- //Set data offset range
- setDatasetOffsetRange(tableMetadata, props);
- String topicName = getTopicName(tid, tableMetadata);
- if (tableMetadata.appendFiles.isPresent()) {
- tableMetadata.appendFiles.get().commit();
- try (Timer.Context context = new Timer().time()) {
- sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
- log.info("Sending audit counts for {} took {} ms", topicName,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
- }
- if (tableMetadata.completenessEnabled) {
- updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
- tableMetadata.totalCountCompletenessEnabled);
- }
- }
- if (tableMetadata.deleteFiles.isPresent()) {
- tableMetadata.deleteFiles.get().commit();
- }
- // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
- // The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
- if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
- && tableMetadata.completenessEnabled) {
- updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
- tableMetadata.totalCountCompletenessEnabled);
- }
+ if (!tableMetadata.transaction.isPresent()) {
+ log.info("There's no transaction initiated for the table {}", tid);
+ return;
+ }
- //Set high waterMark
- Long highWatermark = tableCurrentWatermarkMap.get(tid);
- props.put(String.format(GMCE_HIGH_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)), highWatermark.toString());
- //Set low waterMark
- props.put(String.format(GMCE_LOW_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)),
- tableMetadata.lowWatermark.get().toString());
- //Set whether to delete metadata files after commit
- if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY,
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
- props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
Boolean.toString(
-
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
- props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
Integer.toString(
- conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
- }
- //Update schema(commit)
- updateSchema(tableMetadata, props, topicName);
- //Update properties
- UpdateProperties updateProperties = transaction.updateProperties();
- props.forEach(updateProperties::set);
- updateProperties.commit();
- try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName);
- Timer.Context context = new Timer().time()) {
- transaction.commitTransaction();
- log.info("Committing transaction for table {} took {} ms", tid,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ Transaction transaction = tableMetadata.transaction.get();
+ Map<String, String> props = tableMetadata.newProperties.or(
+
Maps.newHashMap(tableMetadata.lastProperties.or(getIcebergTable(tid).properties())));
+ //Set data offset range
+ setDatasetOffsetRange(tableMetadata, props);
+ String topicName = getTopicName(tid, tableMetadata);
+ if (tableMetadata.appendFiles.isPresent()) {
+ tableMetadata.appendFiles.get().commit();
+ if (tableMetadata.completenessEnabled) {
+ updateWatermarkWithFilesRegistered(topicName, tableMetadata, props,
+ tableMetadata.totalCountCompletenessEnabled);
}
+ }
- // Emit GTE for snapshot commits
- Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
- Map<String, String> currentProps =
tableMetadata.table.get().properties();
- try (Timer.Context context = new Timer().time()) {
- submitSnapshotCommitEvent(snapshot, tableMetadata, dbName,
tableName, currentProps, highWatermark);
- log.info("Sending snapshot commit event for {} took {} ms",
topicName, TimeUnit.NANOSECONDS.toMillis(context.stop()));
- }
+ if (tableMetadata.deleteFiles.isPresent()) {
+ tableMetadata.deleteFiles.get().commit();
+ }
+ // Check and update completion watermark when there are no files to be
registered, typically for quiet topics
+ // The logic is to check the window [currentHour-1,currentHour] and
update the watermark if there are no audit counts
+ if(!tableMetadata.appendFiles.isPresent() &&
!tableMetadata.deleteFiles.isPresent()
+ && tableMetadata.completenessEnabled) {
+ updateWatermarkWithNoFilesRegistered(topicName, tableMetadata, props,
+ tableMetadata.totalCountCompletenessEnabled);
+ }
- //Reset the table metadata for next accumulation period
- tableMetadata.reset(currentProps, highWatermark);
- log.info(String.format("Finish commit of new snapshot %s for table
%s", snapshot.snapshotId(), tid));
- } else {
- log.info("There's no transaction initiated for the table {}", tid);
+ //Set high waterMark
+ Long highWatermark = tableCurrentWatermarkMap.get(tid);
+ props.put(String.format(GMCE_HIGH_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)), highWatermark.toString());
+ //Set low waterMark
+ props.put(String.format(GMCE_LOW_WATERMARK_KEY,
tableTopicPartitionMap.get(tid)),
+ tableMetadata.lowWatermark.get().toString());
+ //Set whether to delete metadata files after commit
+ if (conf.getBoolean(ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY,
DEFAULT_ICEBERG_ENABLE_CUSTOM_METADATA_RETENTION_POLICY)) {
+ props.put(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
Boolean.toString(
+
conf.getBoolean(TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED,
TableProperties.METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULT)));
+ props.put(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
Integer.toString(
+ conf.getInt(TableProperties.METADATA_PREVIOUS_VERSIONS_MAX,
TableProperties.METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT)));
+ }
+ //Update schema(commit)
+ updateSchema(tableMetadata, props, topicName);
+ //Update properties
+ UpdateProperties updateProperties = transaction.updateProperties();
+ props.forEach(updateProperties::set);
+ updateProperties.commit();
+ try (AutoCloseableHiveLock lock = this.locks.getTableLock(dbName,
tableName);
+ Timer.Context context = new Timer().time()) {
+ transaction.commitTransaction();
+ log.info("Committing transaction for table {} took {} ms", tid,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ transactionCommitted = true;
}
- } catch (RuntimeException e) {
- throw new IOException(String.format("Fail to flush table %s %s", dbName,
tableName), e);
+
+ postCommit(tableMetadata, dbName, tableName, topicName, highWatermark);
+ //Reset the table metadata for next accumulation period
+ Map<String, String> currentProps =
tableMetadata.table.get().properties();
+ Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
+ tableMetadata.reset(currentProps, highWatermark);
+ log.info(String.format("Finish commit of new snapshot %s for table %s",
snapshot.snapshotId(), tid));
} catch (Exception e) {
- throw new IOException(String.format("Fail to flush table %s %s", dbName,
tableName), e);
+ throw new IOException(String.format("Failed to flush table %s %s.
transactionCommitted=%s",
+ dbName, tableName, transactionCommitted), e);
} finally {
writeLock.unlock();
}
}
+ /**
+ * PostCommit operation that executes after the transaction is committed to
the Iceberg table. Operations in this
+ * method are considered non-critical to the transaction and will not cause
the transaction to fail if they fail,
+ * but should ideally still be executed for observability.
+ *
+ * One example of this is observability events / metrics like {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}.
+ * The default behavior is to emit a GTE for the commit event and a kafka
audit event
+ *
+ * @param tableMetadata
+ * @param dbName
+ * @param tableName
+ * @param topicName
+ * @param highWatermark
+ * @throws IOException
+ */
+ protected void postCommit(
+ TableMetadata tableMetadata,
+ String dbName,
+ String tableName,
+ String topicName,
+ long highWatermark) throws IOException {
+ // Emit GTE for snapshot commits
+ Snapshot snapshot = tableMetadata.table.get().currentSnapshot();
+ Map<String, String> currentProps = tableMetadata.table.get().properties();
+ try (Timer.Context context = new Timer().time()) {
+ submitSnapshotCommitEvent(snapshot, tableMetadata, dbName, tableName,
currentProps, highWatermark);
+ log.info("Sending snapshot commit event for {} took {} ms", topicName,
TimeUnit.NANOSECONDS.toMillis(context.stop()));
+ }
+
+ try (Timer.Context context = new Timer().time()) {
+ sendAuditCounts(topicName, tableMetadata.serializedAuditCountMaps);
Review Comment:
let's try send audit count first to avoid lower counting as much as possible?
Issue Time Tracking
-------------------
Worklog Id: (was: 891673)
Remaining Estimate: 0h
Time Spent: 10m
> Emit audit count after commit in IcebergMetadataWriter
> ------------------------------------------------------
>
> Key: GOBBLIN-1960
> URL: https://issues.apache.org/jira/browse/GOBBLIN-1960
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Matthew Ho
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)