This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e324d0b8 [GOBBLIN-1994] Fix iceberg-distcp dest-side `TableMetadata` 
consistency check to accurately detect "stale table metadata" (#3876)
0e324d0b8 is described below

commit 0e324d0b84b92174c5e4feee0f80dfd9afa19a13
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Feb 7 20:03:06 2024 -0800

    [GOBBLIN-1994] Fix iceberg-distcp dest-side `TableMetadata` consistency 
check to accurately detect "stale table metadata" (#3876)
    
    [GOBBLIN-1994] Fix iceberg-distcp dest-side `TableMetadata` consistency 
check to accurately detect "stale table metadata"
---
 .../copy/iceberg/IcebergRegisterStep.java          |  40 +++++---
 .../copy/iceberg/IcebergDatasetTest.java           |   5 +-
 .../copy/iceberg/IcebergRegisterStepTest.java      | 103 +++++++++++++++++++++
 3 files changed, 133 insertions(+), 15 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
index 7345e9a00..ce7d11962 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.data.management.copy.iceberg;
 import java.io.IOException;
 import java.util.Properties;
 
+import com.google.common.annotations.VisibleForTesting;
+
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.catalog.TableIdentifier;
 
@@ -65,26 +67,40 @@ public class IcebergRegisterStep implements CommitStep {
 
   @Override
   public void execute() throws IOException {
-    IcebergTable destIcebergTable = 
IcebergDatasetFinder.createIcebergCatalog(this.properties, 
IcebergDatasetFinder.CatalogLocation.DESTINATION)
-        .openTable(TableIdentifier.parse(destTableIdStr));
-    // NOTE: solely by-product of probing table's existence: metadata recorded 
just prior to reading from source catalog is what's actually used
-    TableMetadata unusedNowCurrentDestMetadata = null;
+    IcebergTable destIcebergTable = 
createDestinationCatalog().openTable(TableIdentifier.parse(destTableIdStr));
     try {
-      unusedNowCurrentDestMetadata = destIcebergTable.accessTableMetadata(); 
// probe... (first access could throw)
-      log.info("~{}~ (destination) (using) TableMetadata: {} - {} {}= 
(current) TableMetadata: {} - {}",
-          destTableIdStr,
+      TableMetadata currentDestMetadata = 
destIcebergTable.accessTableMetadata(); // probe... (first access could throw)
+      // CRITICAL: verify current dest-side metadata remains the same as 
observed just prior to first loading source catalog table metadata
+      boolean isJustPriorDestMetadataStillCurrent = 
currentDestMetadata.uuid().equals(justPriorDestTableMetadata.uuid())
+          && 
currentDestMetadata.metadataFileLocation().equals(justPriorDestTableMetadata.metadataFileLocation());
+      String determinationMsg = String.format("(just prior) TableMetadata: %s 
- %s %s= (current) TableMetadata: %s - %s",
           justPriorDestTableMetadata.uuid(), 
justPriorDestTableMetadata.metadataFileLocation(),
-          
unusedNowCurrentDestMetadata.uuid().equals(justPriorDestTableMetadata.uuid()) ? 
"=" : "!",
-          unusedNowCurrentDestMetadata.uuid(), 
unusedNowCurrentDestMetadata.metadataFileLocation());
+          isJustPriorDestMetadataStillCurrent ? "=" : "!",
+          currentDestMetadata.uuid(), 
currentDestMetadata.metadataFileLocation());
+      log.info("~{}~ [destination] {}", destTableIdStr, determinationMsg);
+
+      // NOTE: we originally expected the dest-side catalog to enforce this 
match, but the client-side `BaseMetastoreTableOperations.commit`
+      // uses `==`, rather than `.equals` (value-cmp), and that invariably 
leads to:
+      //   org.apache.iceberg.exceptions.CommitFailedException: Cannot commit: 
stale table metadata
+      if (!isJustPriorDestMetadataStillCurrent) {
+        throw new IOException("error: likely concurrent writing to 
destination: " + determinationMsg);
+      }
+      destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata, 
currentDestMetadata);
     } catch (IcebergTable.TableNotFoundException tnfe) {
-      log.warn("Destination TableMetadata doesn't exist because: " , tnfe);
+      String msg = "Destination table (with TableMetadata) does not exist: " + 
tnfe.getMessage();
+      log.error(msg);
+      throw new IOException(msg, tnfe);
     }
-    // TODO: decide whether helpful to construct a more detailed error message 
about `justPriorDestTableMetadata` being no-longer current
-    destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata, 
justPriorDestTableMetadata);
   }
 
   @Override
   public String toString() {
     return String.format("Registering Iceberg Table: {%s} (dest); (src: 
{%s})", this.destTableIdStr, this.srcTableIdStr);
   }
+
+  /** Purely because the static `IcebergDatasetFinder.createIcebergCatalog` 
proved challenging to mock, even w/ `Mockito::mockStatic` */
+  @VisibleForTesting
+  protected IcebergCatalog createDestinationCatalog() throws IOException {
+    return IcebergDatasetFinder.createIcebergCatalog(this.properties, 
IcebergDatasetFinder.CatalogLocation.DESTINATION);
+  }
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index c5e20ed72..b9babbc88 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -59,7 +59,6 @@ import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 
-import joptsimple.internal.Strings;
 import lombok.Data;
 
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
@@ -684,8 +683,8 @@ public class IcebergDatasetTest {
     private static FileOwnerAndPermissions 
getFileOwnerAndPermissions(JsonObject jsonObject) {
       FileOwnerAndPermissions fileOwnerAndPermissions = new 
FileOwnerAndPermissions();
       JsonObject objData = jsonObject.getAsJsonObject("object-data");
-      fileOwnerAndPermissions.owner = objData.has("owner") ? 
objData.getAsJsonPrimitive("owner").getAsString() : Strings.EMPTY;
-      fileOwnerAndPermissions.group = objData.has("group") ? 
objData.getAsJsonPrimitive("group").getAsString() : Strings.EMPTY;
+      fileOwnerAndPermissions.owner = objData.has("owner") ? 
objData.getAsJsonPrimitive("owner").getAsString() : "";
+      fileOwnerAndPermissions.group = objData.has("group") ? 
objData.getAsJsonPrimitive("group").getAsString() : "";
 
       JsonObject fsPermission = objData.has("fsPermission") ? 
objData.getAsJsonObject("fsPermission") : null;
       if (fsPermission != null) {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
new file mode 100644
index 000000000..5e5f2eaa6
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/** Tests for {@link 
org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep} */
+public class IcebergRegisterStepTest {
+
+  private TableIdentifier srcTableId = TableIdentifier.of("db", "foo");
+  private TableIdentifier destTableId = TableIdentifier.of("db", "bar");
+
+  @Test
+  public void testDestSideMetadataMatchSucceeds() throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = 
Mockito.mock(TableMetadata.class); // (no mocked behavior)
+    IcebergTable mockTable = mockIcebergTable("foo", "bar"); // matches!
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Mockito.verify(mockTable, Mockito.times(1)).registerIcebergTable(any(), 
any());
+    } catch (IOException ioe) {
+      Assert.fail("got unexpected IOException", ioe);
+    }
+  }
+
+  @Test
+  public void testDestSideMetadataMismatchThrowsIOException() throws 
IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = 
Mockito.mock(TableMetadata.class); // (no mocked behavior)
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog("foo", "notBar");
+          }
+        };
+    try {
+      regStep.execute();
+      Assert.fail("expected IOException");
+    } catch (IOException ioe) {
+      Assert.assertTrue(ioe.getMessage().startsWith("error: likely concurrent 
writing to destination"), ioe.getMessage());
+    }
+  }
+
+  protected TableMetadata mockTableMetadata(String uuid, String 
metadataFileLocation) throws IOException {
+    TableMetadata mockMetadata = Mockito.mock(TableMetadata.class);
+    Mockito.when(mockMetadata.uuid()).thenReturn(uuid);
+    
Mockito.when(mockMetadata.metadataFileLocation()).thenReturn(metadataFileLocation);
+    return mockMetadata;
+  }
+
+  protected IcebergTable mockIcebergTable(String tableUuid, String 
tableMetadataFileLocation) throws IOException {
+    TableMetadata mockMetadata = mockTableMetadata(tableUuid, 
tableMetadataFileLocation);
+
+    IcebergTable mockTable = Mockito.mock(IcebergTable.class);
+    Mockito.when(mockTable.accessTableMetadata()).thenReturn(mockMetadata);
+    return mockTable;
+  }
+
+  protected IcebergCatalog mockSingleTableIcebergCatalog(String tableUuid, 
String tableMetadataFileLocation) throws IOException {
+    IcebergTable mockTable = mockIcebergTable(tableUuid, 
tableMetadataFileLocation);
+    return mockSingleTableIcebergCatalog(mockTable);
+  }
+
+  protected IcebergCatalog mockSingleTableIcebergCatalog(IcebergTable 
mockTable) throws IOException {
+    IcebergCatalog catalog = Mockito.mock(IcebergCatalog.class);
+    Mockito.when(catalog.openTable(any())).thenReturn(mockTable);
+    return catalog;
+  }
+}

Reply via email to