This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b428a6645 [GOBBLIN-1802]Register iceberg table metadata update with
destination side catalog (#3663)
b428a6645 is described below
commit b428a6645dee5f550cfe5a932da616dd5ac5f6af
Author: meethngala <[email protected]>
AuthorDate: Mon Apr 3 09:34:44 2023 -0700
[GOBBLIN-1802]Register iceberg table metadata update with destination side
catalog (#3663)
* initial commit for iceberg table registration
* address PR comments
* replace target with dest for consistency and address PR comments
* adding pre-check for dest iceberg table
* change params for tableAlreadyExists and address checkstyle
* add pre-check for iceberg table on source and guard against missing dest
metadata while committing
* address feedback on PR
* updated javadoc
* update error message for catalog uri missing
---------
Co-authored-by: Meeth Gala <[email protected]>
---
.../copy/iceberg/BaseIcebergCatalog.java | 1 +
.../management/copy/iceberg/IcebergCatalog.java | 2 +
.../management/copy/iceberg/IcebergDataset.java | 29 +++++---
.../copy/iceberg/IcebergDatasetFinder.java | 82 +++++++++++++++-------
.../copy/iceberg/IcebergHiveCatalog.java | 11 ++-
.../copy/iceberg/IcebergRegisterStep.java | 54 ++++++++++++++
.../data/management/copy/iceberg/IcebergTable.java | 17 +++--
.../copy/iceberg/IcebergDatasetTest.java | 76 +++++++++++++-------
.../management/copy/iceberg/IcebergTableTest.java | 1 -
gradle/scripts/dependencyDefinitions.gradle | 2 +-
10 files changed, 204 insertions(+), 71 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
index f3ef3309a..0ac4dcc0b 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.copy.iceberg;
import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableOperations;
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
index 15211c8b5..ac342e2e3 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.copy.iceberg;
import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
@@ -28,4 +29,5 @@ public interface IcebergCatalog {
IcebergTable openTable(String dbName, String tableName);
String getCatalogUri();
void initialize(Map<String, String> properties, Configuration configuration);
+ boolean tableAlreadyExists(IcebergTable icebergTable);
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index 2119daef5..f4db7d4ff 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -48,6 +48,7 @@ import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.OwnerAndPermission;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.dataset.DatasetDescriptor;
@@ -64,21 +65,21 @@ import
org.apache.gobblin.util.request_allocation.PushDownRequestor;
public class IcebergDataset implements PrioritizedCopyableDataset {
private final String dbName;
private final String inputTableName;
- private final IcebergTable icebergTable;
+ private final IcebergTable srcIcebergTable;
+ /** Presumed destination {@link IcebergTable} exists */
+ private final IcebergTable destIcebergTable;
protected final Properties properties;
protected final FileSystem sourceFs;
private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make
parameterizable, if desired
- /** Target metastore URI */
- public static final String ICEBERG_TARGET_CATALOG_URI_KEY =
- IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
- /** Target database name */
- public static final String TARGET_DATABASE_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+ /** Destination database name */
+ public static final String DESTINATION_DATABASE_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.destination.database";
- public IcebergDataset(String db, String table, IcebergTable icebergTbl,
Properties properties, FileSystem sourceFs) {
+ public IcebergDataset(String db, String table, IcebergTable srcIcebergTable,
IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) {
this.dbName = db;
this.inputTableName = table;
- this.icebergTable = icebergTbl;
+ this.srcIcebergTable = srcIcebergTable;
+ this.destIcebergTable = destIcebergTable;
this.properties = properties;
this.sourceFs = sourceFs;
}
@@ -154,6 +155,7 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
fileEntity.setDestinationData(getDestinationDataset(targetFs));
copyEntities.add(fileEntity);
}
+ copyEntities.add(createPostPublishStep(this.srcIcebergTable,
this.destIcebergTable));
log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName,
copyEntities.size());
return copyEntities;
}
@@ -163,7 +165,7 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
* @return a map of path, file status for each file that needs to be copied
*/
protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem
targetFs, CopyConfiguration copyConfig) throws IOException {
- IcebergTable icebergTable = this.getIcebergTable();
+ IcebergTable icebergTable = this.getSrcIcebergTable();
/** @return whether `pathStr` is present on `targetFs`, caching results
while tunneling checked exceptions outward */
Function<String, Boolean> isPresentOnTarget =
CheckedExceptionFunction.wrapToTunneled(pathStr ->
// omit considering timestamp (or other markers of freshness), as files
should be immutable
@@ -307,10 +309,15 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
}
protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
- return this.icebergTable.getDatasetDescriptor(sourceFs);
+ return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
}
protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
- return this.icebergTable.getDatasetDescriptor(targetFs);
+ return this.destIcebergTable.getDatasetDescriptor(targetFs);
+ }
+
+ private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable,
IcebergTable dstIcebergTable) {
+ IcebergRegisterStep icebergRegisterStep = new
IcebergRegisterStep(srcIcebergTable, dstIcebergTable);
+ return new PostPublishStep(getFileSetId(), Maps.newHashMap(),
icebergRegisterStep, 0);
}
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index bc111dc25..b20a1bc29 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -25,18 +25,22 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.util.HadoopUtils;
+
/**
* Finds {@link IcebergDataset}s. Will look for tables in a database using a
{@link IcebergCatalog},
* and creates a {@link IcebergDataset} for each one.
@@ -44,15 +48,22 @@ import org.apache.gobblin.util.HadoopUtils;
@Slf4j
@RequiredArgsConstructor
public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
-
public static final String ICEBERG_DATASET_PREFIX =
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
public static final String ICEBERG_CLUSTER_KEY = "cluster";
- public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX +
".database.name";
- public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX +
".table.name";
public static final String ICEBERG_SRC_CATALOG_CLASS_KEY =
ICEBERG_DATASET_PREFIX + ".source.catalog.class";
- public static final String ICEBERG_SRC_CATALOG_URI_KEY =
ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
public static final String DEFAULT_ICEBERG_CATALOG_CLASS =
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
+ public static final String ICEBERG_SRC_CATALOG_URI_KEY =
ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX
+ ".source.cluster.name";
+ public static final String ICEBERG_DEST_CATALOG_CLASS_KEY =
ICEBERG_DATASET_PREFIX + ".destination.catalog.class";
+ public static final String ICEBERG_DEST_CATALOG_URI_KEY =
ICEBERG_DATASET_PREFIX + ".copy.destination.catalog.uri";
+ public static final String ICEBERG_DEST_CLUSTER_NAME =
ICEBERG_DATASET_PREFIX + ".destination.cluster.name";
+ public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX +
".database.name";
+ public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX +
".table.name";
+
+ public enum CatalogLocation {
+ SOURCE,
+ DESTINATION
+ }
protected final FileSystem sourceFs;
private final Properties properties;
@@ -74,18 +85,13 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
String dbName = properties.getProperty(ICEBERG_DB_NAME);
String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
- try {
- IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties);
- /* Each Iceberg dataset maps to an Iceberg table
- * TODO: The user provided database and table names needs to be
pre-checked and verified against the existence of a valid Iceberg table
- */
- matchingDatasets.add(createIcebergDataset(dbName, tblName,
icebergCatalog, properties, sourceFs));
- log.info("Found {} matching datasets: {} for the database name: {} and
table name: {}", matchingDatasets.size(),
- matchingDatasets, dbName, tblName);
- return matchingDatasets;
- } catch (ReflectiveOperationException exception) {
- throw new IOException(exception);
- }
+ IcebergCatalog sourceIcebergCatalog =
createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
+ IcebergCatalog destinationIcebergCatalog =
createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);
+ /* Each Iceberg dataset maps to an Iceberg table */
+ matchingDatasets.add(createIcebergDataset(dbName, tblName,
sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs));
+ log.info("Found {} matching datasets: {} for the database name: {} and
table name: {}", matchingDatasets.size(),
+ matchingDatasets, dbName, tblName); // until future support added to
specify multiple icebergs, count expected always to be one
+ return matchingDatasets;
}
@Override
@@ -98,20 +104,44 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
return findDatasets().iterator();
}
- protected IcebergDataset createIcebergDataset(String dbName, String tblName,
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
- IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
- return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+ /**
+ * Requires both source and destination catalogs to connect to their
respective {@link IcebergTable}
+ * Note: the destination side {@link IcebergTable} should be present before
initiating replication
+ * @return {@link IcebergDataset} with its corresponding source and
destination {@link IcebergTable}
+ */
+ protected IcebergDataset createIcebergDataset(String dbName, String tblName,
IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog,
Properties properties, FileSystem fs) throws IOException {
+ IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName,
tblName);
+
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
+ IcebergTable destIcebergTable =
destinationIcebergCatalog.openTable(dbName, tblName);
+ // TODO: Rethink strategy to enforce dest iceberg table
+
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName,
tblName));
+ return new IcebergDataset(dbName, tblName, srcIcebergTable,
destIcebergTable, properties, fs);
}
- protected IcebergCatalog createIcebergCatalog(Properties properties) throws
IOException, ClassNotFoundException {
+ protected IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
Map<String, String> catalogProperties = new HashMap<>();
- String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
- Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is
required");
- catalogProperties.put(CatalogProperties.URI, catalogUri);
- // introducing an optional property for catalogs requiring cluster
specific properties
-
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
Configuration configuration =
HadoopUtils.getConfFromProperties(properties);
- String icebergCatalogClassName =
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ String catalogUri;
+ String icebergCatalogClassName;
+ switch (location) {
+ case SOURCE:
+ catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+ Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
+ // introducing an optional property for catalogs requiring cluster
specific properties
+
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+ icebergCatalogClassName =
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ break;
+ case DESTINATION:
+ catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
+ Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
+ // introducing an optional property for catalogs requiring cluster
specific properties
+
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
-> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+ icebergCatalogClassName =
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY,
DEFAULT_ICEBERG_CATALOG_CLASS);
+ break;
+ default:
+ throw new UnsupportedOperationException("Incorrect desired location:
%s provided for creating Iceberg Catalog" + location);
+ }
+ catalogProperties.put(CatalogProperties.URI, catalogUri);
return IcebergCatalogFactory.create(icebergCatalogClassName,
catalogProperties, configuration);
}
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
index 5525750e6..af541a79a 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
@@ -18,11 +18,13 @@
package org.apache.gobblin.data.management.copy.iceberg;
import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogProperties;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;
+
import lombok.extern.slf4j.Slf4j;
@@ -47,11 +49,16 @@ public class IcebergHiveCatalog extends BaseIcebergCatalog {
@Override
public String getCatalogUri() {
- return hc.getConf().get(CatalogProperties.URI, "<<not set>>");
+ return hc.getConf().get(HiveConf.ConfVars.METASTOREURIS.varname, "<<not
set>>");
}
@Override
protected TableOperations createTableOperations(TableIdentifier tableId) {
return hc.newTableOps(tableId);
}
+
+ @Override
+ public boolean tableAlreadyExists(IcebergTable icebergTable) {
+ return hc.tableExists(icebergTable.getTableId());
+ }
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
new file mode 100644
index 000000000..75f26787b
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.TableMetadata;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+
+/**
+ * {@link CommitStep} to perform Iceberg registration.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergRegisterStep implements CommitStep {
+
+ private final IcebergTable srcIcebergTable;
+ private final IcebergTable destIcebergTable;
+
+ @Override
+ public boolean isCompleted() throws IOException {
+ return false;
+ }
+
+ @Override
+ public void execute() throws IOException {
+ TableMetadata destinationMetadata = null;
+ try {
+ destinationMetadata = this.destIcebergTable.accessTableMetadata();
+ } catch (IcebergTable.TableNotFoundException tnfe) {
+ log.warn("Destination TableMetadata doesn't exist because: " , tnfe);
+ }
+
this.destIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(),
destinationMetadata);
+ }
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index f65a81417..e8d0ee0ac 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -26,10 +26,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
@@ -44,6 +40,10 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
@@ -67,7 +67,7 @@ public class IcebergTable {
this.tableId = tableId;
}
}
-
+ @Getter
private final TableIdentifier tableId;
private final TableOperations tableOps;
private final String catalogUri;
@@ -194,4 +194,11 @@ public class IcebergTable {
descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
return descriptor;
}
+ /** Registers {@link IcebergTable} after publishing data.
+ * @param dstMetadata is null if destination {@link IcebergTable} is absent,
in which case registration is skipped */
+ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata
dstMetadata) {
+ if (dstMetadata != null) {
+ this.tableOps.commit(srcMetadata, dstMetadata);
+ }
+ }
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 9478207f8..c1872cb4a 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -188,12 +188,11 @@ public class IcebergDatasetTest {
MockFileSystemBuilder sourceFsBuilder = new
MockFileSystemBuilder(SRC_FS_URI);
FileSystem sourceFs = sourceFsBuilder.build();
- IcebergDataset icebergDataset = new IcebergDataset(testDbName,
testTblName, icebergTable, new Properties(), sourceFs);
+ IcebergDataset icebergDataset = new IcebergDataset(testDbName,
testTblName, icebergTable, null, new Properties(), sourceFs);
MockFileSystemBuilder destFsBuilder = new
MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destFsBuilder.build();
Mockito.doThrow(new IOException("Ha - not so
fast!")).when(destFs).getFileStatus(new
Path(SNAPSHOT_PATHS_0.manifestListPath));
-
CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs);
icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration);
}
@@ -227,9 +226,10 @@ public class IcebergDatasetTest {
sourceBuilder.addPaths(expectedPaths);
FileSystem sourceFs = sourceBuilder.build();
- IcebergTable icebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
+ IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
+ IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
IcebergDataset icebergDataset =
- new TrickIcebergDataset(testDbName, testTblName, icebergTable, new
Properties(), sourceFs);
+ new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable,
destIcebergTable, new Properties(), sourceFs);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
@@ -252,9 +252,10 @@ public class IcebergDatasetTest {
sourceBuilder.addPaths(expectedPaths);
FileSystem sourceFs = sourceBuilder.build();
- IcebergTable icebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1,
SNAPSHOT_PATHS_0));
+ IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1,
SNAPSHOT_PATHS_0));
+ IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
IcebergDataset icebergDataset =
- new TrickIcebergDataset(testDbName, testTblName, icebergTable, new
Properties(), sourceFs);
+ new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable,
destIcebergTable, new Properties(), sourceFs);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
@@ -282,8 +283,9 @@ public class IcebergDatasetTest {
sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses);
FileSystem sourceFs = sourceBuilder.build();
- IcebergTable icebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
- IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName,
testTblName, icebergTable, new Properties(), sourceFs);
+ IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
+ IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
+ IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName,
testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
@@ -293,7 +295,6 @@ public class IcebergDatasetTest {
// preserving attributes for owner, group and permissions
respectively
.preserve(PreserveAttributes.fromMnemonicString("ugp"))
.copyContext(new CopyContext()).build();
-
Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
verifyFsOwnershipAndPermissionPreservation(copyEntities,
sourceBuilder.getPathsAndFileStatuses());
}
@@ -310,8 +311,9 @@ public class IcebergDatasetTest {
sourceBuilder.addPaths(expectedPaths);
FileSystem sourceFs = sourceBuilder.build();
- IcebergTable icebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
- IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName,
testTblName, icebergTable, new Properties(), sourceFs);
+ IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
+ IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
+ IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName,
testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
@@ -321,7 +323,6 @@ public class IcebergDatasetTest {
// without preserving attributes for owner, group and permissions
.preserve(PreserveAttributes.fromMnemonicString(""))
.copyContext(new CopyContext()).build();
-
Collection<CopyEntity> copyEntities =
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
verifyFsOwnershipAndPermissionPreservation(copyEntities,
expectedPathsAndFileStatuses);
}
@@ -348,7 +349,7 @@ public class IcebergDatasetTest {
optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths);
FileSystem sourceFs = sourceFsBuilder.build();
IcebergDataset icebergDataset =
- new IcebergDataset(testDbName, testTblName, icebergTable, new
Properties(), sourceFs);
+ new IcebergDataset(testDbName, testTblName, icebergTable, null, new
Properties(), sourceFs);
MockFileSystemBuilder destFsBuilder = new
MockFileSystemBuilder(DEST_FS_URI);
destFsBuilder.addPaths(existingDestPaths);
@@ -382,23 +383,40 @@ public class IcebergDatasetTest {
List<String> actual = new ArrayList<>();
for (CopyEntity copyEntity : copyEntities) {
String json = copyEntity.toString();
- String filepath =
CopyEntityDeserializer.getFilePathAsStringFromJson(json);
- actual.add(filepath);
+ if (isCopyableFile(json)) {
+ String filepath =
CopyEntityDeserializer.getFilePathAsStringFromJson(json);
+ actual.add(filepath);
+ } else{
+ verifyPostPublishStep(json);
+ }
}
Assert.assertEquals(actual.size(), expected.size(), "Set" +
actual.toString() + " vs Set" + expected.toString());
Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
}
+ private static boolean isCopyableFile(String json) {
+ String objectType = new Gson().fromJson(json, JsonObject.class)
+ .getAsJsonPrimitive("object-type")
+ .getAsString();
+ return
objectType.equals("org.apache.gobblin.data.management.copy.CopyableFile");
+ }
+
private static void
verifyFsOwnershipAndPermissionPreservation(Collection<CopyEntity> copyEntities,
Map<Path, FileStatus> expectedPathsAndFileStatuses) {
for (CopyEntity copyEntity : copyEntities) {
String copyEntityJson = copyEntity.toString();
- List<CopyEntityDeserializer.FileOwnerAndPermissions>
ancestorFileOwnerAndPermissionsList =
CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson);
- CopyEntityDeserializer.FileOwnerAndPermissions
destinationFileOwnerAndPermissions =
CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson);
- Path filePath = new
Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson));
- FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath);
- verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus);
- // providing path's parent to verify ancestor owner and permissions
- verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList,
filePath.getParent(), expectedPathsAndFileStatuses);
+ if (isCopyableFile(copyEntityJson)) {
+ List<CopyEntityDeserializer.FileOwnerAndPermissions>
ancestorFileOwnerAndPermissionsList =
+
CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson);
+ CopyEntityDeserializer.FileOwnerAndPermissions
destinationFileOwnerAndPermissions =
CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson);
+ Path filePath = new
Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson));
+ FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath);
+ verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus);
+ // providing path's parent to verify ancestor owner and permissions
+ verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList,
filePath.getParent(),
+ expectedPathsAndFileStatuses);
+ } else {
+ verifyPostPublishStep(copyEntityJson);
+ }
}
}
@@ -419,14 +437,21 @@ public class IcebergDatasetTest {
}
}
+ private static void verifyPostPublishStep(String json) {
+ String expectedCommitStep =
"org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep";
+ String actualCommitStep = new Gson().fromJson(json, JsonObject.class)
+
.getAsJsonObject("object-data").getAsJsonObject("step").getAsJsonPrimitive("object-type").getAsString();
+ Assert.assertEquals(actualCommitStep, expectedCommitStep);
+ }
+
/**
* Sadly, this is needed to avoid losing `FileSystem` mock to replacement
from the `FileSystem.get` `static`
* Without this, so to lose the mock, we'd be unable to set up any source
paths as existing.
*/
protected static class TrickIcebergDataset extends IcebergDataset {
- public TrickIcebergDataset(String db, String table, IcebergTable
icebergTbl, Properties properties,
+ public TrickIcebergDataset(String db, String table, IcebergTable
srcIcebergTbl, IcebergTable destIcebergTbl, Properties properties,
FileSystem sourceFs) {
- super(db, table, icebergTbl, properties, sourceFs);
+ super(db, table, srcIcebergTbl, destIcebergTbl, properties, sourceFs);
}
@Override // as the `static` is not mock-able
@@ -578,7 +603,8 @@ public class IcebergDatasetTest {
public static String getFilePathAsStringFromJson(String json) {
String filepath = new Gson().fromJson(json, JsonObject.class)
- .getAsJsonObject("object-data").getAsJsonObject("origin")
+ .getAsJsonObject("object-data")
+ .getAsJsonObject("origin")
.getAsJsonObject("object-data").getAsJsonObject("path").getAsJsonObject("object-data")
.getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString();
return filepath;
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 3353c3365..20ea30610 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -40,7 +40,6 @@ import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveMetastoreTest;
import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
-
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
diff --git a/gradle/scripts/dependencyDefinitions.gradle
b/gradle/scripts/dependencyDefinitions.gradle
index 3e3db2dfd..56511ff05 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -123,7 +123,7 @@ ext.externalDependency = [
"guiceMultibindings":
"com.google.inject.extensions:guice-multibindings:4.0",
"guiceServlet": "com.google.inject.extensions:guice-servlet:4.0",
"derby": "org.apache.derby:derby:10.12.1.1",
- "mockito": "org.mockito:mockito-core:4.11.0",
+ "mockito": "org.mockito:mockito-inline:4.11.0", // upgraded to allow
mocking for constructors, static and final methods; specifically for iceberg
distcp
"salesforceWsc": "com.force.api:force-wsc:" + salesforceVersion,
"salesforcePartner": "com.force.api:force-partner-api:" +
salesforceVersion,
"scala": "org.scala-lang:scala-library:2.11.8",