This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 97e845915 [GOBBLIN-1961] Allow `IcebergDatasetFinder` to use separate 
names for source vs. destination-side DB and table (#3835)
97e845915 is described below

commit 97e845915b9374d82791b3782e4279531df429f3
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Nov 22 11:20:36 2023 -0800

    [GOBBLIN-1961] Allow `IcebergDatasetFinder` to use separate names for 
source vs. destination-side DB and table (#3835)
    
    * Allow `IcebergDatasetFinder` to use separate names for source vs. 
destination-side DB and table
    
    * Adjust Mockito.verify to pass test
---
 .../management/copy/iceberg/IcebergCatalog.java    | 11 +++
 .../management/copy/iceberg/IcebergDataset.java    | 45 +++++------
 .../copy/iceberg/IcebergDatasetFinder.java         | 87 +++++++++++++++-------
 .../copy/iceberg/IcebergRegisterStep.java          | 23 ++++--
 .../copy/iceberg/IcebergDatasetTest.java           | 49 ++++++------
 5 files changed, 133 insertions(+), 82 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
index ac342e2e3..5794a4c03 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -20,14 +20,25 @@ package org.apache.gobblin.data.management.copy.iceberg;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.catalog.TableIdentifier;
 
 
 /**
  * Any catalog from which to access {@link IcebergTable}s.
  */
 public interface IcebergCatalog {
+
   IcebergTable openTable(String dbName, String tableName);
+
+  default IcebergTable openTable(TableIdentifier tableId) {
+    // CHALLENGE: clearly better to implement in the reverse direction - 
`openTable(String, String)` in terms of `openTable(TableIdentifier)` -
+    // but challenging to do at this point, with multiple derived classes 
already "in the wild" that implement `openTable(String, String)`
+    return openTable(tableId.namespace().toString(), tableId.name());
+  }
+
   String getCatalogUri();
+
   void initialize(Map<String, String> properties, Configuration configuration);
+
   boolean tableAlreadyExists(IcebergTable icebergTable);
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index a59fc3688..67a10a42d 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -63,10 +63,8 @@ import 
org.apache.gobblin.util.request_allocation.PushDownRequestor;
 @Slf4j
 @Getter
 public class IcebergDataset implements PrioritizedCopyableDataset {
-  private final String dbName;
-  private final String inputTableName;
   private final IcebergTable srcIcebergTable;
-  /** Presumed destination {@link IcebergTable} exists */
+  /* CAUTION: *hopefully* `destIcebergTable` exists... although that's not 
necessarily been verified yet */
   private final IcebergTable destIcebergTable;
   protected final Properties properties;
   protected final FileSystem sourceFs;
@@ -75,9 +73,7 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
   /** Destination database name */
   public static final String DESTINATION_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".destination.database";
 
-  public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, 
IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) {
-    this.dbName = db;
-    this.inputTableName = table;
+  public IcebergDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties, FileSystem sourceFs) {
     this.srcIcebergTable = srcIcebergTable;
     this.destIcebergTable = destIcebergTable;
     this.properties = properties;
@@ -117,9 +113,9 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
     return createFileSets(targetFs, configuration);
   }
 
-  /** @return unique ID for this dataset, usable as a {@link 
CopyEntity}.fileset, for atomic publication grouping */
+  /** @return unique ID for dataset (based on the source-side table), usable 
as a {@link CopyEntity#getFileSet}, for atomic publication grouping */
   protected String getFileSetId() {
-    return this.dbName + "." + this.inputTableName;
+    return this.srcIcebergTable.getTableId().toString();
   }
 
   /**
@@ -127,7 +123,7 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
    * comprising the iceberg/table, so as to fully specify remaining table 
replication.
    */
   protected Iterator<FileSet<CopyEntity>> createFileSets(FileSystem targetFs, 
CopyConfiguration configuration) {
-    FileSet<CopyEntity> fileSet = new 
IcebergTableFileSet(this.getInputTableName(), this, targetFs, configuration);
+    FileSet<CopyEntity> fileSet = new IcebergTableFileSet(this.getFileSetId(), 
this, targetFs, configuration);
     return Iterators.singletonIterator(fileSet);
   }
 
@@ -140,7 +136,7 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
     String fileSet = this.getFileSetId();
     List<CopyEntity> copyEntities = Lists.newArrayList();
     Map<Path, FileStatus> pathToFileStatus = 
getFilePathsToFileStatus(targetFs, copyConfig);
-    log.info("~{}.{}~ found {} candidate source paths", dbName, 
inputTableName, pathToFileStatus.size());
+    log.info("~{}~ found {} candidate source paths", fileSet, 
pathToFileStatus.size());
 
     Configuration defaultHadoopConfiguration = new Configuration();
     for (Map.Entry<Path, FileStatus> entry : pathToFileStatus.entrySet()) {
@@ -165,8 +161,8 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
       copyEntities.add(fileEntity);
     }
     // TODO: Filter properties specific to iceberg registration and avoid 
serializing every global property
-    copyEntities.add(createPostPublishStep(this.dbName, this.inputTableName, 
this.properties));
-    log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, 
copyEntities.size());
+    copyEntities.add(createPostPublishStep());
+    log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
     return copyEntities;
   }
 
@@ -187,8 +183,8 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
     IcebergSnapshotInfo currentSnapshotOverview = 
icebergTable.getCurrentSnapshotInfoOverviewOnly();
     if 
(currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false) 
&&
         
isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) {
-      log.info("~{}.{}~ skipping entire iceberg, since snapshot '{}' at '{}' 
and metadata '{}' both present on target",
-          dbName, inputTableName, currentSnapshotOverview.getSnapshotId(),
+      log.info("~{}~ skipping entire iceberg, since snapshot '{}' at '{}' and 
metadata '{}' both present on target",
+          this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
           currentSnapshotOverview.getManifestListPath(),
           currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: 
MISSING!>>"));
       return Maps.newHashMap();
@@ -198,7 +194,7 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
           // log each snapshot, for context, in case of 
`FileNotFoundException` during `FileSystem.getFileStatus()`
           String manListPath = snapshotInfo.getManifestListPath();
-          log.info("~{}.{}~ loaded snapshot '{}' at '{}' from metadata path: 
'{}'", dbName, inputTableName,
+          log.info("~{}~ loaded snapshot '{}' at '{}' from metadata path: 
'{}'", this.getFileSetId(),
               snapshotInfo.getSnapshotId(), manListPath, 
snapshotInfo.getMetadataPath().orElse("<<inherited>>"));
           // ALGO: an iceberg's files form a tree of four levels: 
metadata.json -> manifest-list -> manifest -> data;
           // most critically, all are presumed immutable and uniquely named, 
although any may be replaced.  we depend
@@ -224,18 +220,17 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
                 missingPaths.addAll(mfi.getListedFilePaths());
               }
             }
-            log.info("~{}.{}~ snapshot '{}': collected {} additional source 
paths",
-                dbName, inputTableName, snapshotInfo.getSnapshotId(), 
missingPaths.size());
+            log.info("~{}~ snapshot '{}': collected {} additional source 
paths",
+                this.getFileSetId(), snapshotInfo.getSnapshotId(), 
missingPaths.size());
             return missingPaths.iterator();
           } else {
-            log.info("~{}.{}~ snapshot '{}' already present on target... 
skipping (including contents)",
-                dbName, inputTableName, snapshotInfo.getSnapshotId());
+            log.info("~{}~ snapshot '{}' already present on target... skipping 
(including contents)",
+                this.getFileSetId(), snapshotInfo.getSnapshotId());
             // IMPORTANT: separately consider metadata path, to handle case of 
'metadata-only' snapshot reusing mf-list
             Optional<String> metadataPath = snapshotInfo.getMetadataPath();
             Optional<String> nonReplicatedMetadataPath = metadataPath.filter(p 
-> !isPresentOnTarget.apply(p));
             metadataPath.ifPresent(ignore ->
-                log.info("~{}.{}~ metadata IS {} already present on target", 
dbName, inputTableName,
-                    nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO")
+                log.info("~{}~ metadata IS {} already present on target", 
this.getFileSetId(), nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO")
             );
             return nonReplicatedMetadataPath.map(p -> 
Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
           }
@@ -255,7 +250,7 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
         try {
           results.put(path, this.sourceFs.getFileStatus(path));
           if (growthTracker.isAnotherMilestone(results.size())) {
-            log.info("~{}.{}~ collected file status on '{}' source paths", 
dbName, inputTableName, results.size());
+            log.info("~{}~ collected file status on '{}' source paths", 
this.getFileSetId(), results.size());
           }
         } catch (FileNotFoundException fnfe) {
           if (!shouldTolerateMissingSourceFiles) {
@@ -265,7 +260,7 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
             String total = ++numSourceFilesNotFound + " total";
             String speculation = "either premature deletion broke time-travel 
or metadata read interleaved among delete";
             errorConsolidator.prepLogMsg(path).ifPresent(msg ->
-                log.warn("~{}.{}~ source {} ({}... {})", dbName, 
inputTableName, msg, speculation, total)
+                log.warn("~{}~ source {} ({}... {})", this.getFileSetId(), 
msg, speculation, total)
             );
           }
         }
@@ -326,8 +321,8 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
     return this.destIcebergTable.getDatasetDescriptor(targetFs);
   }
 
-  private PostPublishStep createPostPublishStep(String dbName, String 
inputTableName, Properties properties) {
-    IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(dbName, 
inputTableName, properties);
+  private PostPublishStep createPostPublishStep() {
+    IcebergRegisterStep icebergRegisterStep = new 
IcebergRegisterStep(this.srcIcebergTable.getTableId(), 
this.destIcebergTable.getTableId(), this.properties);
     return new PostPublishStep(getFileSetId(), Maps.newHashMap(), 
icebergRegisterStep, 0);
   }
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index beded5a72..dc407f38c 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -55,22 +55,28 @@ public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDatase
   public static final String DEFAULT_ICEBERG_CATALOG_CLASS = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
   public static final String ICEBERG_CATALOG_KEY = "catalog";
   /**
-   * This is used with a prefix: "{@link 
IcebergDatasetFinder#ICEBERG_DATASET_PREFIX}" + "." + "(source or destination)" 
+ "." + "{@link IcebergDatasetFinder#ICEBERG_CATALOG_KEY}" + "..."
-   * It is an open-ended pattern used to pass arbitrary catalog specific 
properties
+   * This is used with a prefix: "{@link 
IcebergDatasetFinder#ICEBERG_DATASET_PREFIX}" + "." + "( source | destination 
)" + "." + "{@link IcebergDatasetFinder#ICEBERG_CATALOG_KEY}" + "..."
+   * It is an open-ended pattern used to pass arbitrary catalog-scoped 
properties
    */
   public static final String ICEBERG_CATALOG_CLASS_KEY = "class";
-  public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + 
".database.name";
-  public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + 
".table.name";
+  public static final String ICEBERG_DB_NAME_KEY = "database.name";
+  public static final String ICEBERG_TABLE_NAME_KEY = "table.name";
+  /** please use source/dest-scoped properties */
+  @Deprecated
+  public static final String ICEBERG_DB_NAME_LEGACY = ICEBERG_DATASET_PREFIX + 
"." + ICEBERG_DB_NAME_KEY;
+  /** please use source/dest-scoped properties */
+  @Deprecated
+  public static final String ICEBERG_TABLE_NAME_LEGACY = 
ICEBERG_DATASET_PREFIX + "." + ICEBERG_TABLE_NAME_KEY;
 
   public enum CatalogLocation {
     SOURCE,
     DESTINATION;
 
     /**
-     * Provides prefix for configs based on the catalog location to filter 
catalog specific properties
+     * Provides prefix for configs based on the catalog orientation (source or 
destination) for catalog-targeted properties
      */
     public String getConfigPrefix() {
-      return ICEBERG_DATASET_PREFIX + "." + this.toString().toLowerCase() + 
"." + ICEBERG_CATALOG_KEY + ".";
+      return ICEBERG_DATASET_PREFIX + "." + this.toString().toLowerCase() + 
".";
     }
   }
 
@@ -86,20 +92,38 @@ public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDatase
    */
   @Override
   public List<IcebergDataset> findDatasets() throws IOException {
-    List<IcebergDataset> matchingDatasets = new ArrayList<>();
-    if (StringUtils.isBlank(properties.getProperty(ICEBERG_DB_NAME)) || 
StringUtils.isBlank(properties.getProperty(ICEBERG_TABLE_NAME))) {
-      throw new IllegalArgumentException(String.format("Iceberg database name: 
{%s} or Iceberg table name: {%s} is missing",
-          ICEBERG_DB_NAME, ICEBERG_TABLE_NAME));
+    String srcDbName = getLocationQualifiedProperty(properties, 
CatalogLocation.SOURCE, ICEBERG_DB_NAME_KEY);
+    String destDbName = getLocationQualifiedProperty(properties, 
CatalogLocation.DESTINATION, ICEBERG_DB_NAME_KEY);
+    // TODO: eventually remove support for combo (src+dest) iceberg props, in 
favor of separate source/dest-scoped props; for now, maintain support
+    if (StringUtils.isBlank(srcDbName) || StringUtils.isBlank(destDbName)) {
+      srcDbName = destDbName = properties.getProperty(ICEBERG_DB_NAME_LEGACY);
+    }
+    String srcTableName = getLocationQualifiedProperty(properties, 
CatalogLocation.SOURCE, ICEBERG_TABLE_NAME_KEY);
+    String destTableName = getLocationQualifiedProperty(properties, 
CatalogLocation.DESTINATION, ICEBERG_TABLE_NAME_KEY);
+    // TODO: eventually remove support for combo (src+dest) iceberg props, in 
favor of separate source/dest-scoped props; for now, maintain support
+    if (StringUtils.isBlank(srcTableName) || 
StringUtils.isBlank(destTableName)) {
+      srcTableName = destTableName = 
properties.getProperty(ICEBERG_TABLE_NAME_LEGACY);
+    }
+    if (StringUtils.isBlank(srcDbName) || StringUtils.isBlank(srcTableName)) {
+      throw new IllegalArgumentException(
+          String.format("Missing (at least some) IcebergDataset properties - 
source: ('%s' and '%s') and destination: ('%s' and '%s') "
+                  + "or [deprecated!] common/combo: ('%s' and '%s')",
+              calcLocationQualifiedPropName(CatalogLocation.SOURCE, 
ICEBERG_DB_NAME_KEY),
+              calcLocationQualifiedPropName(CatalogLocation.SOURCE, 
ICEBERG_TABLE_NAME_KEY),
+              calcLocationQualifiedPropName(CatalogLocation.DESTINATION, 
ICEBERG_DB_NAME_KEY),
+              calcLocationQualifiedPropName(CatalogLocation.DESTINATION, 
ICEBERG_TABLE_NAME_KEY),
+              ICEBERG_DB_NAME_LEGACY,
+              ICEBERG_TABLE_NAME_LEGACY));
     }
-    String dbName = properties.getProperty(ICEBERG_DB_NAME);
-    String tblName = properties.getProperty(ICEBERG_TABLE_NAME);
 
-    IcebergCatalog sourceIcebergCatalog = 
createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
-    IcebergCatalog destinationIcebergCatalog = 
createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);
+    IcebergCatalog srcIcebergCatalog = createIcebergCatalog(this.properties, 
CatalogLocation.SOURCE);
+    IcebergCatalog destIcebergCatalog = createIcebergCatalog(this.properties, 
CatalogLocation.DESTINATION);
+
     /* Each Iceberg dataset maps to an Iceberg table */
-    matchingDatasets.add(createIcebergDataset(dbName, tblName, 
sourceIcebergCatalog, destinationIcebergCatalog, this.properties, 
this.sourceFs));
-    log.info("Found {} matching datasets: {} for the database name: {} and 
table name: {}", matchingDatasets.size(),
-        matchingDatasets, dbName, tblName); // until future support added to 
specify multiple icebergs, count expected always to be one
+    List<IcebergDataset> matchingDatasets = new ArrayList<>();
+    matchingDatasets.add(createIcebergDataset(srcIcebergCatalog, srcDbName, 
srcTableName, destIcebergCatalog, destDbName, destTableName, this.properties, 
this.sourceFs));
+    log.info("Found {} matching datasets: {} for the (source) '{}.{}' / (dest) 
'{}.{}'", matchingDatasets.size(),
+        matchingDatasets, srcDbName, srcTableName, destDbName, destTableName); 
// until future support added to specify multiple icebergs, count expected 
always to be one
     return matchingDatasets;
   }
 
@@ -114,28 +138,39 @@ public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDatase
   }
 
   /**
-   * Requires both source and destination catalogs to connect to their 
respective {@link IcebergTable}
+   * Uses each source and destination {@link IcebergCatalog} to load and 
verify existence of the respective {@link IcebergTable}
    * Note: the destination side {@link IcebergTable} should be present before 
initiating replication
+   *
    * @return {@link IcebergDataset} with its corresponding source and 
destination {@link IcebergTable}
    */
-  protected IcebergDataset createIcebergDataset(String dbName, String tblName, 
IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, 
Properties properties, FileSystem fs) throws IOException {
-    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, 
tblName);
-    
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
 String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
-    IcebergTable destIcebergTable = 
destinationIcebergCatalog.openTable(dbName, tblName);
+  protected IcebergDataset createIcebergDataset(IcebergCatalog 
sourceIcebergCatalog, String srcDbName, String srcTableName, IcebergCatalog 
destinationIcebergCatalog, String destDbName, String destTableName, Properties 
properties, FileSystem fs) throws IOException {
+    IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(srcDbName, 
srcTableName);
+    
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
 String.format("Missing Source Iceberg Table: {%s}.{%s}", srcDbName, 
srcTableName));
+    IcebergTable destIcebergTable = 
destinationIcebergCatalog.openTable(destDbName, destTableName);
     // TODO: Rethink strategy to enforce dest iceberg table
-    
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
 String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, 
tblName));
-    return new IcebergDataset(dbName, tblName, srcIcebergTable, 
destIcebergTable, properties, fs);
+    
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
 String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, 
destTableName));
+    return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, 
fs);
   }
 
   protected static IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
-    String prefix = location.getConfigPrefix();
-    Map<String, String> catalogProperties = 
buildMapFromPrefixChildren(properties, prefix);
+    String catalogPrefix = calcLocationQualifiedPropName(location, 
ICEBERG_CATALOG_KEY + ".");
+    Map<String, String> catalogProperties = 
buildMapFromPrefixChildren(properties, catalogPrefix);
     // TODO: Filter properties specific to Hadoop
     Configuration configuration = 
HadoopUtils.getConfFromProperties(properties);
     String icebergCatalogClassName = 
catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS_KEY, 
DEFAULT_ICEBERG_CATALOG_CLASS);
     return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
   }
 
+  /** @return property value or `null` */
+  protected static String getLocationQualifiedProperty(Properties properties, 
CatalogLocation location, String relativePropName) {
+    return properties.getProperty(calcLocationQualifiedPropName(location, 
relativePropName));
+  }
+
+  /** @return absolute (`location`-qualified) property name for 
`relativePropName` */
+  protected static String calcLocationQualifiedPropName(CatalogLocation 
location, String relativePropName) {
+    return location.getConfigPrefix() + relativePropName;
+  }
+
   /**
    * Filters the properties based on a prefix using {@link 
ConfigBuilder#loadProps(Properties, String)} and creates a {@link Map}
    */
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
index 8f32f8cc0..8beb72e66 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
@@ -21,23 +21,30 @@ import java.io.IOException;
 import java.util.Properties;
 
 import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.catalog.TableIdentifier;
 
-import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.commit.CommitStep;
 
+
 /**
  * {@link CommitStep} to perform Iceberg registration.
  */
 @Slf4j
-@AllArgsConstructor
 public class IcebergRegisterStep implements CommitStep {
 
-  private final String dbName;
-  private final String tblName;
+  // store as string for serializability... TODO: explore whether truly 
necessary (or we could just as well store as `TableIdentifier`)
+  private final String srcTableIdStr;
+  private final String destTableIdStr;
   private final Properties properties;
 
+  public IcebergRegisterStep(TableIdentifier srcTableId, TableIdentifier 
destTableId, Properties properties) {
+    this.srcTableIdStr = srcTableId.toString();
+    this.destTableIdStr = destTableId.toString();
+    this.properties = properties;
+  }
+
   @Override
   public boolean isCompleted() throws IOException {
     return false;
@@ -46,12 +53,12 @@ public class IcebergRegisterStep implements CommitStep {
   @Override
   public void execute() throws IOException {
     IcebergTable srcIcebergTable = 
IcebergDatasetFinder.createIcebergCatalog(this.properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE)
-        .openTable(this.dbName, this.tblName);
+        .openTable(TableIdentifier.parse(srcTableIdStr));
     IcebergTable destIcebergTable = 
IcebergDatasetFinder.createIcebergCatalog(this.properties, 
IcebergDatasetFinder.CatalogLocation.DESTINATION)
-        .openTable(this.dbName, this.tblName);
+        .openTable(TableIdentifier.parse(destTableIdStr));
     TableMetadata destinationMetadata = null;
     try {
-      destinationMetadata = destIcebergTable.accessTableMetadata();
+      destinationMetadata = destIcebergTable.accessTableMetadata(); // 
probe... (first access could throw)
     } catch (IcebergTable.TableNotFoundException tnfe) {
       log.warn("Destination TableMetadata doesn't exist because: " , tnfe);
     }
@@ -59,6 +66,6 @@ public class IcebergRegisterStep implements CommitStep {
   }
   @Override
   public String toString() {
-    return String.format("Registering Iceberg Table: {%s}.{%s} ", this.dbName, 
this.tblName);
+    return String.format("Registering Iceberg Table: {%s} (dest); (src: 
{%s})", this.destTableIdStr, this.srcTableIdStr);
   }
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 09238445c..0b485e1df 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -193,17 +193,18 @@ public class IcebergDatasetTest {
         validateGetFilePathsGivenDestState(icebergSnapshots, 
existingDestPaths, expectedResultPaths);
     // ensure short-circuiting was able to avert iceberg manifests scan
     Mockito.verify(mockTable, 
Mockito.times(1)).getCurrentSnapshotInfoOverviewOnly();
+    Mockito.verify(mockTable, Mockito.times(1)).getTableId();
     Mockito.verifyNoMoreInteractions(mockTable);
   }
 
   /** Exception wrapping is used internally--ensure that doesn't lapse into 
silently swallowing errors */
   @Test(expectedExceptions = IOException.class)
   public void testGetFilePathsDoesNotSwallowDestFileSystemException() throws 
IOException {
-    IcebergTable icebergTable = 
MockIcebergTable.withSnapshots(Lists.newArrayList(SNAPSHOT_PATHS_0));
+    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(TableIdentifier.of(testDbName, testTblName), 
Lists.newArrayList(SNAPSHOT_PATHS_0));
 
     MockFileSystemBuilder sourceFsBuilder = new 
MockFileSystemBuilder(SRC_FS_URI);
     FileSystem sourceFs = sourceFsBuilder.build();
-    IcebergDataset icebergDataset = new IcebergDataset(testDbName, 
testTblName, icebergTable, null, new Properties(), sourceFs);
+    IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, 
new Properties(), sourceFs);
 
     MockFileSystemBuilder destFsBuilder = new 
MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destFsBuilder.build();
@@ -241,10 +242,10 @@ public class IcebergDatasetTest {
     sourceBuilder.addPaths(expectedPaths);
     FileSystem sourceFs = sourceBuilder.build();
 
-    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
-    IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
-    IcebergDataset icebergDataset =
-        new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs);
+    TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName, 
testTblName);
+    IcebergTable srcIcebergTbl = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_0));
+    IcebergTable destIcebergTbl = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_1));
+    IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTbl, 
destIcebergTbl, new Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
@@ -267,10 +268,10 @@ public class IcebergDatasetTest {
     sourceBuilder.addPaths(expectedPaths);
     FileSystem sourceFs = sourceBuilder.build();
 
-    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, 
SNAPSHOT_PATHS_0));
-    IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
-    IcebergDataset icebergDataset =
-        new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs);
+    TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName, 
testTblName);
+    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1, 
SNAPSHOT_PATHS_0));
+    IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_1));
+    IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
@@ -298,9 +299,10 @@ public class IcebergDatasetTest {
     sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses);
     FileSystem sourceFs = sourceBuilder.build();
 
-    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
-    IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
-    IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, 
testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs);
+    TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName, 
testTblName);
+    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_0));
+    IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_1));
+    IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
@@ -326,9 +328,10 @@ public class IcebergDatasetTest {
     sourceBuilder.addPaths(expectedPaths);
     FileSystem sourceFs = sourceBuilder.build();
 
-    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0));
-    IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1));
-    IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, 
testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs);
+    TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName, 
testTblName);
+    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_0));
+    IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_1));
+    IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
@@ -358,13 +361,12 @@ public class IcebergDatasetTest {
    */
   protected IcebergTable 
validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> 
sourceSnapshotPathSets,
       Optional<List<String>> optExistingSourcePaths, List<String> 
existingDestPaths, Set<Path> expectedResultPaths) throws IOException {
-    IcebergTable icebergTable = 
MockIcebergTable.withSnapshots(sourceSnapshotPathSets);
+    IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(TableIdentifier.of(testDbName, testTblName), 
sourceSnapshotPathSets);
 
     MockFileSystemBuilder sourceFsBuilder = new 
MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent());
     optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths);
     FileSystem sourceFs = sourceFsBuilder.build();
-    IcebergDataset icebergDataset =
-        new IcebergDataset(testDbName, testTblName, icebergTable, null, new 
Properties(), sourceFs);
+    IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, 
new Properties(), sourceFs);
 
     MockFileSystemBuilder destFsBuilder = new 
MockFileSystemBuilder(DEST_FS_URI);
     destFsBuilder.addPaths(existingDestPaths);
@@ -377,7 +379,7 @@ public class IcebergDatasetTest {
     Assert.assertEquals(
         
filePathsToFileStatus.values().stream().map(FileStatus::getPath).collect(Collectors.toSet()),
         expectedResultPaths);
-    return icebergTable;
+    return srcIcebergTable;
   }
 
   /** @return `paths` after adding to it all paths of every one of 
`snapshotDefs` */
@@ -464,9 +466,9 @@ public class IcebergDatasetTest {
    *  Without this, so to lose the mock, we'd be unable to set up any source 
paths as existing.
    */
   protected static class TrickIcebergDataset extends IcebergDataset {
-    public TrickIcebergDataset(String db, String table, IcebergTable 
srcIcebergTbl, IcebergTable destIcebergTbl, Properties properties,
+    public TrickIcebergDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties,
         FileSystem sourceFs) {
-      super(db, table, srcIcebergTbl, destIcebergTbl, properties, sourceFs);
+      super(srcIcebergTable, destIcebergTable, properties, sourceFs);
     }
 
     @Override // as the `static` is not mock-able
@@ -581,8 +583,9 @@ public class IcebergDatasetTest {
       }
     }
 
-    public static IcebergTable withSnapshots(List<SnapshotPaths> 
snapshotPathSets) throws IOException {
+    public static IcebergTable withSnapshots(TableIdentifier tableId, 
List<SnapshotPaths> snapshotPathSets) throws IOException {
       IcebergTable table = Mockito.mock(IcebergTable.class);
+      Mockito.when(table.getTableId()).thenReturn(tableId);
       int lastIndex = snapshotPathSets.size() - 1;
       Mockito.when(table.getCurrentSnapshotInfoOverviewOnly())
           
.thenReturn(snapshotPathSets.get(lastIndex).asSnapshotInfo(lastIndex));

Reply via email to