phet commented on code in PR #3876:
URL: https://github.com/apache/gobblin/pull/3876#discussion_r1482319134
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -65,26 +67,40 @@ public boolean isCompleted() throws IOException {
@Override
public void execute() throws IOException {
- IcebergTable destIcebergTable =
IcebergDatasetFinder.createIcebergCatalog(this.properties,
IcebergDatasetFinder.CatalogLocation.DESTINATION)
- .openTable(TableIdentifier.parse(destTableIdStr));
- // NOTE: solely by-product of probing table's existence: metadata recorded
just prior to reading from source catalog is what's actually used
- TableMetadata unusedNowCurrentDestMetadata = null;
+ IcebergTable destIcebergTable =
createDestinationCatalog().openTable(TableIdentifier.parse(destTableIdStr));
try {
- unusedNowCurrentDestMetadata = destIcebergTable.accessTableMetadata();
// probe... (first access could throw)
- log.info("~{}~ (destination) (using) TableMetadata: {} - {} {}=
(current) TableMetadata: {} - {}",
- destTableIdStr,
+ TableMetadata currentDestMetadata =
destIcebergTable.accessTableMetadata(); // probe... (first access could throw)
+ // CRITICAL: verify current dest-side metadata remains the same as
observed just prior to first loading source catalog table metadata
+ boolean isJustPriorDestMetadataStillCurrent =
currentDestMetadata.uuid().equals(justPriorDestTableMetadata.uuid())
+ &&
currentDestMetadata.metadataFileLocation().equals(justPriorDestTableMetadata.metadataFileLocation());
+ String determinationMsg = String.format("(just prior) TableMetadata: {}
- {} {}= (current) TableMetadata: {} - {}",
justPriorDestTableMetadata.uuid(),
justPriorDestTableMetadata.metadataFileLocation(),
-
unusedNowCurrentDestMetadata.uuid().equals(justPriorDestTableMetadata.uuid()) ?
"=" : "!",
- unusedNowCurrentDestMetadata.uuid(),
unusedNowCurrentDestMetadata.metadataFileLocation());
+ isJustPriorDestMetadataStillCurrent ? "=" : "!",
+ currentDestMetadata.uuid(),
currentDestMetadata.metadataFileLocation());
+ log.info("~{}~ [destination] {}", destTableIdStr, determinationMsg);
+
+ // NOTE: we originally expected the dest-side catalog to enforce this
match, but the client-side `BaseMetastoreTableOperations.commit`
+ // uses `==`, rather than `.equals` (value-cmp), and that invariably
leads to:
+ // org.apache.iceberg.exceptions.CommitFailedException: Cannot commit:
stale table metadata
+ if (!isJustPriorDestMetadataStillCurrent) {
+ throw new IOException("error: likely concurrent writing to
destination: " + determinationMsg);
Review Comment:
this can happen if someone other than replication committed in between this
job's initial "work discovery" (WU planning) and this moment. or even if
another replication executed concurrently (not really a good idea!) and the
first execution already successfully committed to this table. prior to this
fix, interleaved dest-side commits went undetected which could result in
undefined behavior. the fix here merely adds detection with rejection.
remediation would be to re-start replication from the beginning, so work
discovery (WU planning) begins afresh. most practical is just to launch
another execution.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]