This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b428a6645 [GOBBLIN-1802]Register iceberg table metadata update with 
destination side catalog (#3663)
b428a6645 is described below

commit b428a6645dee5f550cfe5a932da616dd5ac5f6af
Author: meethngala <[email protected]>
AuthorDate: Mon Apr 3 09:34:44 2023 -0700

    [GOBBLIN-1802]Register iceberg table metadata update with destination side 
catalog (#3663)
    
    * initial commit for iceberg table registration
    
    * address PR comments
    
    * replace target with dest for consistency and address PR comments
    
    * adding pre-check for dest iceberg table
    
    * change params for tableAlreadyExists and address checkstyle
    
    * add pre-check for iceberg table on source and guard against missing dest 
metadata while committing
    
    * address feedback on PR
    
    * updated javadoc
    
    * update error message for catalog uri missing
    
    ---------
    
    Co-authored-by: Meeth Gala <[email protected]>
---
 .../copy/iceberg/BaseIcebergCatalog.java           |  1 +
 .../management/copy/iceberg/IcebergCatalog.java    |  2 +
 .../management/copy/iceberg/IcebergDataset.java    | 29 +++++---
 .../copy/iceberg/IcebergDatasetFinder.java         | 82 +++++++++++++++-------
 .../copy/iceberg/IcebergHiveCatalog.java           | 11 ++-
 .../copy/iceberg/IcebergRegisterStep.java          | 54 ++++++++++++++
 .../data/management/copy/iceberg/IcebergTable.java | 17 +++--
 .../copy/iceberg/IcebergDatasetTest.java           | 76 +++++++++++++-------
 .../management/copy/iceberg/IcebergTableTest.java  |  1 -
 gradle/scripts/dependencyDefinitions.gradle        |  2 +-
 10 files changed, 204 insertions(+), 71 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
index f3ef3309a..0ac4dcc0b 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
 import org.apache.iceberg.TableOperations;
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
index 15211c8b5..ac342e2e3 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 
 
@@ -28,4 +29,5 @@ public interface IcebergCatalog {
   IcebergTable openTable(String dbName, String tableName);
   String getCatalogUri();
   void initialize(Map<String, String> properties, Configuration configuration);
+  boolean tableAlreadyExists(IcebergTable icebergTable);
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index 2119daef5..f4db7d4ff 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -48,6 +48,7 @@ import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.CopyableDataset;
 import org.apache.gobblin.data.management.copy.CopyableFile;
 import org.apache.gobblin.data.management.copy.OwnerAndPermission;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
 import 
org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
 import org.apache.gobblin.data.management.partition.FileSet;
 import org.apache.gobblin.dataset.DatasetDescriptor;
@@ -64,21 +65,21 @@ import 
org.apache.gobblin.util.request_allocation.PushDownRequestor;
 public class IcebergDataset implements PrioritizedCopyableDataset {
   private final String dbName;
   private final String inputTableName;
-  private final IcebergTable icebergTable;
+  private final IcebergTable srcIcebergTable;
+  /** Presumed destination {@link IcebergTable} exists */
+  private final IcebergTable destIcebergTable;
   protected final Properties properties;
   protected final FileSystem sourceFs;
   private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make 
parameterizable, if desired
 
-  /** Target metastore URI */
-  public static final String ICEBERG_TARGET_CATALOG_URI_KEY =
-      IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
-  /** Target database name */
-  public static final String TARGET_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
+  /** Destination database name */
+  public static final String DESTINATION_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.destination.database";
 
-  public IcebergDataset(String db, String table, IcebergTable icebergTbl, 
Properties properties, FileSystem sourceFs) {
+  public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, 
IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) {
     this.dbName = db;
     this.inputTableName = table;
-    this.icebergTable = icebergTbl;
+    this.srcIcebergTable = srcIcebergTable;
+    this.destIcebergTable = destIcebergTable;
     this.properties = properties;
     this.sourceFs = sourceFs;
   }
@@ -154,6 +155,7 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
       fileEntity.setDestinationData(getDestinationDataset(targetFs));
       copyEntities.add(fileEntity);
     }
+    copyEntities.add(createPostPublishStep(this.srcIcebergTable, 
this.destIcebergTable));
     log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, 
copyEntities.size());
     return copyEntities;
   }
@@ -163,7 +165,7 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
    * @return a map of path, file status for each file that needs to be copied
    */
   protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem 
targetFs, CopyConfiguration copyConfig) throws IOException {
-    IcebergTable icebergTable = this.getIcebergTable();
+    IcebergTable icebergTable = this.getSrcIcebergTable();
     /** @return whether `pathStr` is present on `targetFs`, caching results 
while tunneling checked exceptions outward */
     Function<String, Boolean> isPresentOnTarget = 
CheckedExceptionFunction.wrapToTunneled(pathStr ->
       // omit considering timestamp (or other markers of freshness), as files 
should be immutable
@@ -307,10 +309,15 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
   }
 
   protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
-    return this.icebergTable.getDatasetDescriptor(sourceFs);
+    return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
   }
 
   protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
-    return this.icebergTable.getDatasetDescriptor(targetFs);
+    return this.destIcebergTable.getDatasetDescriptor(targetFs);
+  }
+
+  private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable, 
IcebergTable dstIcebergTable) {
+    IcebergRegisterStep icebergRegisterStep = new 
IcebergRegisterStep(srcIcebergTable, dstIcebergTable);
+    return new PostPublishStep(getFileSetId(), Maps.newHashMap(), 
icebergRegisterStep, 0);
   }
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index bc111dc25..b20a1bc29 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -25,18 +25,22 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.IterableDatasetFinder;
 import org.apache.gobblin.util.HadoopUtils;
 
+
 /**
  * Finds {@link IcebergDataset}s. Will look for tables in a database using a 
{@link IcebergCatalog},
  * and creates a {@link IcebergDataset} for each one.
@@ -44,15 +48,22 @@ import org.apache.gobblin.util.HadoopUtils;
 @Slf4j
 @RequiredArgsConstructor
 public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDataset> {
-
   public static final String ICEBERG_DATASET_PREFIX = 
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
   public static final String ICEBERG_CLUSTER_KEY = "cluster";
-  public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + 
".database.name";
-  public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + 
".table.name";
   public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.class";
-  public static final String ICEBERG_SRC_CATALOG_URI_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
   public static final String DEFAULT_ICEBERG_CATALOG_CLASS = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
+  public static final String ICEBERG_SRC_CATALOG_URI_KEY = 
ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
   public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX 
+ ".source.cluster.name";
+  public static final String ICEBERG_DEST_CATALOG_CLASS_KEY = 
ICEBERG_DATASET_PREFIX + ".destination.catalog.class";
+  public static final String ICEBERG_DEST_CATALOG_URI_KEY = 
ICEBERG_DATASET_PREFIX + ".copy.destination.catalog.uri";
+  public static final String ICEBERG_DEST_CLUSTER_NAME = 
ICEBERG_DATASET_PREFIX + ".destination.cluster.name";
+  public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + 
".database.name";
+  public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + 
".table.name";
+
+  public enum CatalogLocation {
+    SOURCE,
+    DESTINATION
+  }
 
   protected final FileSystem sourceFs;
   private final Properties properties;
@@ -74,18 +85,13 @@ public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDatase
     String dbName = properties.getProperty(ICEBERG_DB_NAME);
     String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
 
-    try {
-      IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties);
-      /* Each Iceberg dataset maps to an Iceberg table
-       * TODO: The user provided database and table names needs to be 
pre-checked and verified against the existence of a valid Iceberg table
-       */
-      matchingDatasets.add(createIcebergDataset(dbName, tblName, 
icebergCatalog, properties, sourceFs));
-      log.info("Found {} matching datasets: {} for the database name: {} and 
table name: {}", matchingDatasets.size(),
-          matchingDatasets, dbName, tblName);
-      return matchingDatasets;
-    } catch (ReflectiveOperationException exception) {
-      throw new IOException(exception);
-    }
+    IcebergCatalog sourceIcebergCatalog = 
createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
+    IcebergCatalog destinationIcebergCatalog = 
createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);
+    /* Each Iceberg dataset maps to an Iceberg table */
+    matchingDatasets.add(createIcebergDataset(dbName, tblName, 
sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs));
+    log.info("Found {} matching datasets: {} for the database name: {} and 
table name: {}", matchingDatasets.size(),
+        matchingDatasets, dbName, tblName); // until future support added to 
specify multiple icebergs, count expected always to be one
+    return matchingDatasets;
   }
 
   @Override
@@ -98,20 +104,44 @@ public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDatase
     return findDatasets().iterator();
   }
 
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
-    IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
-    return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
+  /**
+   * Requires both source and destination catalogs to connect to their 
respective {@link IcebergTable}
+   * Note: the destination side {@link IcebergTable} should be present before 
initiating replication
+   * @return {@link IcebergDataset} with its corresponding source and 
destination {@link IcebergTable}
+   */
+  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, 
Properties properties, FileSystem fs) throws IOException {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, 
tblName);
+    
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
 String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
+    IcebergTable destIcebergTable = 
destinationIcebergCatalog.openTable(dbName, tblName);
+    // TODO: Rethink strategy to enforce dest iceberg table
+    
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
 String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, 
tblName));
+    return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
   }
 
-  protected IcebergCatalog createIcebergCatalog(Properties properties) throws 
IOException, ClassNotFoundException {
+  protected IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
     Map<String, String> catalogProperties = new HashMap<>();
-    String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
-    Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is 
required");
-    catalogProperties.put(CatalogProperties.URI, catalogUri);
-    // introducing an optional property for catalogs requiring cluster 
specific properties
-    
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
-    String icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+    String catalogUri;
+    String icebergCatalogClassName;
+    switch (location) {
+      case SOURCE:
+        catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source 
Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        icebergCatalogClassName = 
properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+        break;
+      case DESTINATION:
+        catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
+        Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination 
Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
+        // introducing an optional property for catalogs requiring cluster 
specific properties
+        
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value
 -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
+        icebergCatalogClassName = 
properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
+        break;
+      default:
+        throw new UnsupportedOperationException("Incorrect desired location: 
%s provided for creating Iceberg Catalog" + location);
+    }
+    catalogProperties.put(CatalogProperties.URI, catalogUri);
     return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
   }
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
index 5525750e6..af541a79a 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
@@ -18,11 +18,13 @@
 package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.CatalogProperties;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hive.HiveCatalog;
+
 import lombok.extern.slf4j.Slf4j;
 
 
@@ -47,11 +49,16 @@ public class IcebergHiveCatalog extends BaseIcebergCatalog {
 
   @Override
   public String getCatalogUri() {
-    return hc.getConf().get(CatalogProperties.URI, "<<not set>>");
+    return hc.getConf().get(HiveConf.ConfVars.METASTOREURIS.varname, "<<not 
set>>");
   }
 
   @Override
   protected TableOperations createTableOperations(TableIdentifier tableId) {
     return hc.newTableOps(tableId);
   }
+
+  @Override
+  public boolean tableAlreadyExists(IcebergTable icebergTable) {
+    return hc.tableExists(icebergTable.getTableId());
+  }
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
new file mode 100644
index 000000000..75f26787b
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+
+import org.apache.iceberg.TableMetadata;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+
+/**
+ * {@link CommitStep} to perform Iceberg registration.
+ */
+@Slf4j
+@AllArgsConstructor
+public class IcebergRegisterStep implements CommitStep {
+
+  private final IcebergTable srcIcebergTable;
+  private final IcebergTable destIcebergTable;
+
+  @Override
+  public boolean isCompleted() throws IOException {
+    return false;
+  }
+
+  @Override
+  public void execute() throws IOException {
+    TableMetadata destinationMetadata = null;
+    try {
+      destinationMetadata = this.destIcebergTable.accessTableMetadata();
+    } catch (IcebergTable.TableNotFoundException tnfe) {
+      log.warn("Destination TableMetadata doesn't exist because: " , tnfe);
+    }
+    
this.destIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(),
 destinationMetadata);
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index f65a81417..e8d0ee0ac 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -26,10 +26,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
@@ -44,6 +40,10 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
 
@@ -67,7 +67,7 @@ public class IcebergTable {
       this.tableId = tableId;
     }
   }
-
+  @Getter
   private final TableIdentifier tableId;
   private final TableOperations tableOps;
   private final String catalogUri;
@@ -194,4 +194,11 @@ public class IcebergTable {
     descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
     return descriptor;
   }
+  /** Registers {@link IcebergTable} after publishing data.
+   * @param dstMetadata is null if destination {@link IcebergTable} is absent, 
in which case registration is skipped */
+  protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata 
dstMetadata) {
+    if (dstMetadata != null) {
+      this.tableOps.commit(srcMetadata, dstMetadata);
+    }
+  }
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 9478207f8..c1872cb4a 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -188,12 +188,11 @@ public class IcebergDatasetTest {
 
     MockFileSystemBuilder sourceFsBuilder = new 
MockFileSystemBuilder(SRC_FS_URI);
     FileSystem sourceFs = sourceFsBuilder.build();
-    IcebergDataset icebergDataset = new IcebergDataset(testDbName, 
testTblName, icebergTable, new Properties(), sourceFs);
+    IcebergDataset icebergDataset = new IcebergDataset(testDbName, 
testTblName, icebergTable, null, new Properties(), sourceFs);
 
     MockFileSystemBuilder destFsBuilder = new 
MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destFsBuilder.build();
     Mockito.doThrow(new IOException("Ha - not so 
fast!")).when(destFs).getFileStatus(new 
Path(SNAPSHOT_PATHS_0.manifestListPath));
-
     CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs);
     icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration);
   }
@@ -227,9 +226,10 @@ public class IcebergDatasetTest {
     sourceBuilder.addPaths(expectedPaths);
     FileSystem sourceFs = sourceBuilder.build();
 
-    IcebergTable icebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
+    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
+    IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
     IcebergDataset icebergDataset =
-        new TrickIcebergDataset(testDbName, testTblName, icebergTable, new 
Properties(), sourceFs);
+        new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
@@ -252,9 +252,10 @@ public class IcebergDatasetTest {
     sourceBuilder.addPaths(expectedPaths);
     FileSystem sourceFs = sourceBuilder.build();
 
-    IcebergTable icebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, 
SNAPSHOT_PATHS_0));
+    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, 
SNAPSHOT_PATHS_0));
+    IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
     IcebergDataset icebergDataset =
-        new TrickIcebergDataset(testDbName, testTblName, icebergTable, new 
Properties(), sourceFs);
+        new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
@@ -282,8 +283,9 @@ public class IcebergDatasetTest {
     sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses);
     FileSystem sourceFs = sourceBuilder.build();
 
-    IcebergTable icebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
-    IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, 
testTblName, icebergTable, new Properties(), sourceFs);
+    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
+    IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
+    IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, 
testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
@@ -293,7 +295,6 @@ public class IcebergDatasetTest {
             // preserving attributes for owner, group and permissions 
respectively
             .preserve(PreserveAttributes.fromMnemonicString("ugp"))
             .copyContext(new CopyContext()).build();
-
     Collection<CopyEntity> copyEntities = 
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
     verifyFsOwnershipAndPermissionPreservation(copyEntities, 
sourceBuilder.getPathsAndFileStatuses());
   }
@@ -310,8 +311,9 @@ public class IcebergDatasetTest {
     sourceBuilder.addPaths(expectedPaths);
     FileSystem sourceFs = sourceBuilder.build();
 
-    IcebergTable icebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
-    IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, 
testTblName, icebergTable, new Properties(), sourceFs);
+    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
+    IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
+    IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, 
testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
@@ -321,7 +323,6 @@ public class IcebergDatasetTest {
             // without preserving attributes for owner, group and permissions
             .preserve(PreserveAttributes.fromMnemonicString(""))
             .copyContext(new CopyContext()).build();
-
     Collection<CopyEntity> copyEntities = 
icebergDataset.generateCopyEntities(destFs, copyConfiguration);
     verifyFsOwnershipAndPermissionPreservation(copyEntities, 
expectedPathsAndFileStatuses);
   }
@@ -348,7 +349,7 @@ public class IcebergDatasetTest {
     optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths);
     FileSystem sourceFs = sourceFsBuilder.build();
     IcebergDataset icebergDataset =
-        new IcebergDataset(testDbName, testTblName, icebergTable, new 
Properties(), sourceFs);
+        new IcebergDataset(testDbName, testTblName, icebergTable, null, new 
Properties(), sourceFs);
 
     MockFileSystemBuilder destFsBuilder = new 
MockFileSystemBuilder(DEST_FS_URI);
     destFsBuilder.addPaths(existingDestPaths);
@@ -382,23 +383,40 @@ public class IcebergDatasetTest {
     List<String> actual = new ArrayList<>();
     for (CopyEntity copyEntity : copyEntities) {
       String json = copyEntity.toString();
-      String filepath = 
CopyEntityDeserializer.getFilePathAsStringFromJson(json);
-      actual.add(filepath);
+      if (isCopyableFile(json)) {
+        String filepath = 
CopyEntityDeserializer.getFilePathAsStringFromJson(json);
+        actual.add(filepath);
+      } else{
+        verifyPostPublishStep(json);
+      }
     }
     Assert.assertEquals(actual.size(), expected.size(), "Set" + 
actual.toString() + " vs Set" + expected.toString());
     Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
   }
 
+  private static boolean isCopyableFile(String json) {
+    String objectType = new Gson().fromJson(json, JsonObject.class)
+        .getAsJsonPrimitive("object-type")
+        .getAsString();
+    return 
objectType.equals("org.apache.gobblin.data.management.copy.CopyableFile");
+  }
+
   private static void 
verifyFsOwnershipAndPermissionPreservation(Collection<CopyEntity> copyEntities, 
Map<Path, FileStatus> expectedPathsAndFileStatuses) {
     for (CopyEntity copyEntity : copyEntities) {
       String copyEntityJson = copyEntity.toString();
-      List<CopyEntityDeserializer.FileOwnerAndPermissions> 
ancestorFileOwnerAndPermissionsList = 
CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson);
-      CopyEntityDeserializer.FileOwnerAndPermissions 
destinationFileOwnerAndPermissions = 
CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson);
-      Path filePath = new 
Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson));
-      FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath);
-      verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus);
-      // providing path's parent to verify ancestor owner and permissions
-      verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList, 
filePath.getParent(), expectedPathsAndFileStatuses);
+      if (isCopyableFile(copyEntityJson)) {
+        List<CopyEntityDeserializer.FileOwnerAndPermissions> 
ancestorFileOwnerAndPermissionsList =
+            
CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson);
+        CopyEntityDeserializer.FileOwnerAndPermissions 
destinationFileOwnerAndPermissions = 
CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson);
+        Path filePath = new 
Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson));
+        FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath);
+        verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus);
+        // providing path's parent to verify ancestor owner and permissions
+        verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList, 
filePath.getParent(),
+            expectedPathsAndFileStatuses);
+      } else {
+        verifyPostPublishStep(copyEntityJson);
+      }
     }
   }
 
@@ -419,14 +437,21 @@ public class IcebergDatasetTest {
     }
   }
 
+  private static void verifyPostPublishStep(String json) {
+    String expectedCommitStep = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep";
+    String actualCommitStep = new Gson().fromJson(json, JsonObject.class)
+        
.getAsJsonObject("object-data").getAsJsonObject("step").getAsJsonPrimitive("object-type").getAsString();
+    Assert.assertEquals(actualCommitStep, expectedCommitStep);
+  }
+
   /**
    *  Sadly, this is needed to avoid losing `FileSystem` mock to replacement 
from the `FileSystem.get` `static`
    *  Without this, so to lose the mock, we'd be unable to set up any source 
paths as existing.
    */
   protected static class TrickIcebergDataset extends IcebergDataset {
-    public TrickIcebergDataset(String db, String table, IcebergTable 
icebergTbl, Properties properties,
+    public TrickIcebergDataset(String db, String table, IcebergTable 
srcIcebergTbl, IcebergTable destIcebergTbl, Properties properties,
         FileSystem sourceFs) {
-      super(db, table, icebergTbl, properties, sourceFs);
+      super(db, table, srcIcebergTbl, destIcebergTbl, properties, sourceFs);
     }
 
     @Override // as the `static` is not mock-able
@@ -578,7 +603,8 @@ public class IcebergDatasetTest {
 
     public static String getFilePathAsStringFromJson(String json) {
       String filepath = new Gson().fromJson(json, JsonObject.class)
-              .getAsJsonObject("object-data").getAsJsonObject("origin")
+              .getAsJsonObject("object-data")
+              .getAsJsonObject("origin")
               
.getAsJsonObject("object-data").getAsJsonObject("path").getAsJsonObject("object-data")
               
.getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString();
       return filepath;
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 3353c3365..20ea30610 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -40,7 +40,6 @@ import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hive.HiveMetastoreTest;
 import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
-
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
diff --git a/gradle/scripts/dependencyDefinitions.gradle 
b/gradle/scripts/dependencyDefinitions.gradle
index 3e3db2dfd..56511ff05 100644
--- a/gradle/scripts/dependencyDefinitions.gradle
+++ b/gradle/scripts/dependencyDefinitions.gradle
@@ -123,7 +123,7 @@ ext.externalDependency = [
     "guiceMultibindings": 
"com.google.inject.extensions:guice-multibindings:4.0",
     "guiceServlet": "com.google.inject.extensions:guice-servlet:4.0",
     "derby": "org.apache.derby:derby:10.12.1.1",
-    "mockito": "org.mockito:mockito-core:4.11.0",
+    "mockito": "org.mockito:mockito-inline:4.11.0", // upgraded to allow 
mocking for constructors, static and final methods; specifically for iceberg 
distcp
     "salesforceWsc": "com.force.api:force-wsc:" + salesforceVersion,
     "salesforcePartner": "com.force.api:force-partner-api:" + 
salesforceVersion,
     "scala": "org.scala-lang:scala-library:2.11.8",


Reply via email to