This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0e324d0b8 [GOBBLIN-1994] Fix iceberg-distcp dest-side `TableMetadata`
consistency check to accurately detect "stale table metadata" (#3876)
0e324d0b8 is described below
commit 0e324d0b84b92174c5e4feee0f80dfd9afa19a13
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Feb 7 20:03:06 2024 -0800
[GOBBLIN-1994] Fix iceberg-distcp dest-side `TableMetadata` consistency
check to accurately detect "stale table metadata" (#3876)
[GOBBLIN-1994] Fix iceberg-distcp dest-side `TableMetadata` consistency
check to accurately detect "stale table metadata"
---
.../copy/iceberg/IcebergRegisterStep.java | 40 +++++---
.../copy/iceberg/IcebergDatasetTest.java | 5 +-
.../copy/iceberg/IcebergRegisterStepTest.java | 103 +++++++++++++++++++++
3 files changed, 133 insertions(+), 15 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
index 7345e9a00..ce7d11962 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
@@ -20,6 +20,8 @@ package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
import java.util.Properties;
+import com.google.common.annotations.VisibleForTesting;
+
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.TableIdentifier;
@@ -65,26 +67,40 @@ public class IcebergRegisterStep implements CommitStep {
@Override
public void execute() throws IOException {
- IcebergTable destIcebergTable =
IcebergDatasetFinder.createIcebergCatalog(this.properties,
IcebergDatasetFinder.CatalogLocation.DESTINATION)
- .openTable(TableIdentifier.parse(destTableIdStr));
- // NOTE: solely by-product of probing table's existence: metadata recorded
just prior to reading from source catalog is what's actually used
- TableMetadata unusedNowCurrentDestMetadata = null;
+ IcebergTable destIcebergTable =
createDestinationCatalog().openTable(TableIdentifier.parse(destTableIdStr));
try {
- unusedNowCurrentDestMetadata = destIcebergTable.accessTableMetadata();
// probe... (first access could throw)
- log.info("~{}~ (destination) (using) TableMetadata: {} - {} {}=
(current) TableMetadata: {} - {}",
- destTableIdStr,
+ TableMetadata currentDestMetadata =
destIcebergTable.accessTableMetadata(); // probe... (first access could throw)
+ // CRITICAL: verify current dest-side metadata remains the same as
observed just prior to first loading source catalog table metadata
+ boolean isJustPriorDestMetadataStillCurrent =
currentDestMetadata.uuid().equals(justPriorDestTableMetadata.uuid())
+ &&
currentDestMetadata.metadataFileLocation().equals(justPriorDestTableMetadata.metadataFileLocation());
+ String determinationMsg = String.format("(just prior) TableMetadata: %s
- %s %s= (current) TableMetadata: %s - %s",
justPriorDestTableMetadata.uuid(),
justPriorDestTableMetadata.metadataFileLocation(),
-
unusedNowCurrentDestMetadata.uuid().equals(justPriorDestTableMetadata.uuid()) ?
"=" : "!",
- unusedNowCurrentDestMetadata.uuid(),
unusedNowCurrentDestMetadata.metadataFileLocation());
+ isJustPriorDestMetadataStillCurrent ? "=" : "!",
+ currentDestMetadata.uuid(),
currentDestMetadata.metadataFileLocation());
+ log.info("~{}~ [destination] {}", destTableIdStr, determinationMsg);
+
+ // NOTE: we originally expected the dest-side catalog to enforce this
match, but the client-side `BaseMetastoreTableOperations.commit`
+ // uses `==`, rather than `.equals` (value-cmp), and that invariably
leads to:
+ // org.apache.iceberg.exceptions.CommitFailedException: Cannot commit:
stale table metadata
+ if (!isJustPriorDestMetadataStillCurrent) {
+ throw new IOException("error: likely concurrent writing to
destination: " + determinationMsg);
+ }
+ destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata,
currentDestMetadata);
} catch (IcebergTable.TableNotFoundException tnfe) {
- log.warn("Destination TableMetadata doesn't exist because: " , tnfe);
+ String msg = "Destination table (with TableMetadata) does not exist: " +
tnfe.getMessage();
+ log.error(msg);
+ throw new IOException(msg, tnfe);
}
- // TODO: decide whether helpful to construct a more detailed error message
about `justPriorDestTableMetadata` being no-longer current
- destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata,
justPriorDestTableMetadata);
}
@Override
public String toString() {
return String.format("Registering Iceberg Table: {%s} (dest); (src:
{%s})", this.destTableIdStr, this.srcTableIdStr);
}
+
+ /** Purely because the static `IcebergDatasetFinder.createIcebergCatalog`
proved challenging to mock, even w/ `Mockito::mockStatic` */
+ @VisibleForTesting
+ protected IcebergCatalog createDestinationCatalog() throws IOException {
+ return IcebergDatasetFinder.createIcebergCatalog(this.properties,
IcebergDatasetFinder.CatalogLocation.DESTINATION);
+ }
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index c5e20ed72..b9babbc88 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -59,7 +59,6 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import joptsimple.internal.Strings;
import lombok.Data;
import org.apache.gobblin.data.management.copy.CopyConfiguration;
@@ -684,8 +683,8 @@ public class IcebergDatasetTest {
private static FileOwnerAndPermissions
getFileOwnerAndPermissions(JsonObject jsonObject) {
FileOwnerAndPermissions fileOwnerAndPermissions = new
FileOwnerAndPermissions();
JsonObject objData = jsonObject.getAsJsonObject("object-data");
- fileOwnerAndPermissions.owner = objData.has("owner") ?
objData.getAsJsonPrimitive("owner").getAsString() : Strings.EMPTY;
- fileOwnerAndPermissions.group = objData.has("group") ?
objData.getAsJsonPrimitive("group").getAsString() : Strings.EMPTY;
+ fileOwnerAndPermissions.owner = objData.has("owner") ?
objData.getAsJsonPrimitive("owner").getAsString() : "";
+ fileOwnerAndPermissions.group = objData.has("group") ?
objData.getAsJsonPrimitive("group").getAsString() : "";
JsonObject fsPermission = objData.has("fsPermission") ?
objData.getAsJsonObject("fsPermission") : null;
if (fsPermission != null) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
new file mode 100644
index 000000000..5e5f2eaa6
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
+
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+
+/** Tests for {@link
org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep} */
+public class IcebergRegisterStepTest {
+
+ private TableIdentifier srcTableId = TableIdentifier.of("db", "foo");
+ private TableIdentifier destTableId = TableIdentifier.of("db", "bar");
+
+ @Test
+ public void testDestSideMetadataMatchSucceeds() throws IOException {
+ TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+ TableMetadata readTimeSrcTableMetadata =
Mockito.mock(TableMetadata.class); // (no mocked behavior)
+ IcebergTable mockTable = mockIcebergTable("foo", "bar"); // matches!
+ IcebergRegisterStep regStep =
+ new IcebergRegisterStep(srcTableId, destTableId,
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+ @Override
+ protected IcebergCatalog createDestinationCatalog() throws
IOException {
+ return mockSingleTableIcebergCatalog(mockTable);
+ }
+ };
+ try {
+ regStep.execute();
+ Mockito.verify(mockTable, Mockito.times(1)).registerIcebergTable(any(),
any());
+ } catch (IOException ioe) {
+ Assert.fail("got unexpected IOException", ioe);
+ }
+ }
+
+ @Test
+ public void testDestSideMetadataMismatchThrowsIOException() throws
IOException {
+ TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+ TableMetadata readTimeSrcTableMetadata =
Mockito.mock(TableMetadata.class); // (no mocked behavior)
+ IcebergRegisterStep regStep =
+ new IcebergRegisterStep(srcTableId, destTableId,
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+ @Override
+ protected IcebergCatalog createDestinationCatalog() throws
IOException {
+ return mockSingleTableIcebergCatalog("foo", "notBar");
+ }
+ };
+ try {
+ regStep.execute();
+ Assert.fail("expected IOException");
+ } catch (IOException ioe) {
+ Assert.assertTrue(ioe.getMessage().startsWith("error: likely concurrent
writing to destination"), ioe.getMessage());
+ }
+ }
+
+ protected TableMetadata mockTableMetadata(String uuid, String
metadataFileLocation) throws IOException {
+ TableMetadata mockMetadata = Mockito.mock(TableMetadata.class);
+ Mockito.when(mockMetadata.uuid()).thenReturn(uuid);
+
Mockito.when(mockMetadata.metadataFileLocation()).thenReturn(metadataFileLocation);
+ return mockMetadata;
+ }
+
+ protected IcebergTable mockIcebergTable(String tableUuid, String
tableMetadataFileLocation) throws IOException {
+ TableMetadata mockMetadata = mockTableMetadata(tableUuid,
tableMetadataFileLocation);
+
+ IcebergTable mockTable = Mockito.mock(IcebergTable.class);
+ Mockito.when(mockTable.accessTableMetadata()).thenReturn(mockMetadata);
+ return mockTable;
+ }
+
+ protected IcebergCatalog mockSingleTableIcebergCatalog(String tableUuid,
String tableMetadataFileLocation) throws IOException {
+ IcebergTable mockTable = mockIcebergTable(tableUuid,
tableMetadataFileLocation);
+ return mockSingleTableIcebergCatalog(mockTable);
+ }
+
+ protected IcebergCatalog mockSingleTableIcebergCatalog(IcebergTable
mockTable) throws IOException {
+ IcebergCatalog catalog = Mockito.mock(IcebergCatalog.class);
+ Mockito.when(catalog.openTable(any())).thenReturn(mockTable);
+ return catalog;
+ }
+}