phet commented on code in PR #3876:
URL: https://github.com/apache/gobblin/pull/3876#discussion_r1482319134


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -65,26 +67,40 @@ public boolean isCompleted() throws IOException {
 
   @Override
   public void execute() throws IOException {
-    IcebergTable destIcebergTable = 
IcebergDatasetFinder.createIcebergCatalog(this.properties, 
IcebergDatasetFinder.CatalogLocation.DESTINATION)
-        .openTable(TableIdentifier.parse(destTableIdStr));
-    // NOTE: solely by-product of probing table's existence: metadata recorded 
just prior to reading from source catalog is what's actually used
-    TableMetadata unusedNowCurrentDestMetadata = null;
+    IcebergTable destIcebergTable = 
createDestinationCatalog().openTable(TableIdentifier.parse(destTableIdStr));
     try {
-      unusedNowCurrentDestMetadata = destIcebergTable.accessTableMetadata(); 
// probe... (first access could throw)
-      log.info("~{}~ (destination) (using) TableMetadata: {} - {} {}= 
(current) TableMetadata: {} - {}",
-          destTableIdStr,
+      TableMetadata currentDestMetadata = 
destIcebergTable.accessTableMetadata(); // probe... (first access could throw)
+      // CRITICAL: verify current dest-side metadata remains the same as 
observed just prior to first loading source catalog table metadata
+      boolean isJustPriorDestMetadataStillCurrent = 
currentDestMetadata.uuid().equals(justPriorDestTableMetadata.uuid())
+          && 
currentDestMetadata.metadataFileLocation().equals(justPriorDestTableMetadata.metadataFileLocation());
+      String determinationMsg = String.format("(just prior) TableMetadata: {} 
- {} {}= (current) TableMetadata: {} - {}",
           justPriorDestTableMetadata.uuid(), 
justPriorDestTableMetadata.metadataFileLocation(),
-          
unusedNowCurrentDestMetadata.uuid().equals(justPriorDestTableMetadata.uuid()) ? 
"=" : "!",
-          unusedNowCurrentDestMetadata.uuid(), 
unusedNowCurrentDestMetadata.metadataFileLocation());
+          isJustPriorDestMetadataStillCurrent ? "=" : "!",
+          currentDestMetadata.uuid(), 
currentDestMetadata.metadataFileLocation());
+      log.info("~{}~ [destination] {}", destTableIdStr, determinationMsg);
+
+      // NOTE: we originally expected the dest-side catalog to enforce this 
match, but the client-side `BaseMetastoreTableOperations.commit`
+      // uses `==`, rather than `.equals` (value-cmp), and that invariably 
leads to:
+      //   org.apache.iceberg.exceptions.CommitFailedException: Cannot commit: 
stale table metadata
+      if (!isJustPriorDestMetadataStillCurrent) {
+        throw new IOException("error: likely concurrent writing to 
destination: " + determinationMsg);

Review Comment:
   this can happen if someone other than replication committed in between this 
job's initial "work discovery" (WU planning) and this moment.  or even if 
another replication executed concurrently (not really a good idea!) and the 
first execution already successfully committed to this table.  prior to this 
fix, interleaved dest-side commits went undetected which could result in 
undefined behavior.  the fix here merely adds detection with rejection.
   
   remediation would be to re-start replication from the beginning, so work 
discovery (WU planning) begins afresh.  most practical is just to launch 
another execution.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to