This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f6f1bc51d [GOBBLIN-1994] Ensure iceberg-distcp consistency by using 
same `TableMetadata` for both WU planning and final commit (#3870)
f6f1bc51d is described below

commit f6f1bc51d3fc020add7e7a1ee6760432f197870d
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Jan 31 13:47:15 2024 -0800

    [GOBBLIN-1994] Ensure iceberg-distcp consistency by using same 
`TableMetadata` for both WU planning and final commit (#3870)
    
    Ensure iceberg-distcp consistency by using same `TableMetadata` for both WU 
planning and final commit
---
 .../management/copy/iceberg/IcebergDataset.java    | 72 +++++++++++++++++++---
 .../copy/iceberg/IcebergDatasetFinder.java         |  2 +-
 .../copy/iceberg/IcebergRegisterStep.java          | 35 ++++++++---
 .../copy/iceberg/IcebergSnapshotInfo.java          |  6 +-
 .../data/management/copy/iceberg/IcebergTable.java | 16 +++--
 .../copy/iceberg/IcebergDatasetTest.java           | 13 +++-
 6 files changed, 115 insertions(+), 29 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index dbd4f780a..9e6364c32 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -40,9 +41,12 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.iceberg.TableMetadata;
+
 import org.apache.gobblin.data.management.copy.CopyConfiguration;
 import org.apache.gobblin.data.management.copy.CopyEntity;
 import org.apache.gobblin.data.management.copy.CopyableDataset;
@@ -101,6 +105,7 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
   public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs, 
CopyConfiguration configuration) {
     return createFileSets(targetFs, configuration);
   }
+
   /**
    * Finds all files read by the table and generates CopyableFiles.
    * For the specific semantics see {@link #createFileSets}.
@@ -126,6 +131,14 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
     return Iterators.singletonIterator(fileSet);
   }
 
+  /** Conveys {@link Path}-to-{@link FileStatus} mapping in combination with 
{@link TableMetadata} originating from the same atomic read of those paths */
+  @Data
+  @VisibleForTesting
+  protected static final class GetFilePathsToFileStatusResult {
+    private final Map<Path, FileStatus> pathsToFileStatus;
+    private final TableMetadata tableMetadata;
+  }
+
   /**
    * Finds all files, data and metadata, as {@link CopyEntity}s that comprise 
the table and fully specify remaining
    * table replication.
@@ -134,7 +147,9 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
   Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, 
CopyConfiguration copyConfig) throws IOException {
     String fileSet = this.getFileSetId();
     List<CopyEntity> copyEntities = Lists.newArrayList();
-    Map<Path, FileStatus> pathToFileStatus = 
getFilePathsToFileStatus(targetFs, copyConfig, this.shouldIncludeMetadataPath);
+    TableMetadata destTableMetadataBeforeSrcRead = 
getCurrentDestTableMetadata();
+    GetFilePathsToFileStatusResult atomicGetPathsResult = 
getFilePathsToFileStatus(targetFs, copyConfig, this.shouldIncludeMetadataPath);
+    Map<Path, FileStatus> pathToFileStatus = 
atomicGetPathsResult.getPathsToFileStatus();
     log.info("~{}~ found {} candidate source paths", fileSet, 
pathToFileStatus.size());
 
     Configuration defaultHadoopConfiguration = new Configuration();
@@ -159,18 +174,32 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
       fileEntity.setDestinationData(getDestinationDataset(targetFs));
       copyEntities.add(fileEntity);
     }
-    // TODO: Filter properties specific to iceberg registration and avoid 
serializing every global property
-    copyEntities.add(createPostPublishStep());
+
+    
copyEntities.add(createPostPublishStep(atomicGetPathsResult.getTableMetadata(), 
destTableMetadataBeforeSrcRead));
     log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
     return copyEntities;
   }
 
   /**
-   * Finds all files of the Iceberg's current snapshot
+   * @return {@link TableMetadata} current at destination
+   * @throws {@link IOException} if `this.destIcebergTable` does not exist
+   */
+  protected TableMetadata getCurrentDestTableMetadata() throws IOException {
+    try {
+      return destIcebergTable.accessTableMetadata();
+    } catch (IcebergTable.TableNotFoundException tnfe) {
+      log.error("No destination TableMetadata because table not found: '" + 
destIcebergTable.getTableId() + "'" , tnfe);
+      throw new IOException(tnfe);
+    }
+  }
+
+  /**
+   * Finds all files of the Iceberg's current snapshot and also returns the 
{@link TableMetadata} atomically accessed while reading those paths
    * @param shouldIncludeMetadataPath whether to consider "metadata.json" 
(`getMetadataPath()`) as eligible for inclusion
-   * @return a map of path, file status for each file that needs to be copied
+   * @return combined result: both a map of path to file status for each file 
to copy plus the {@link TableMetadata}, atomically observed with those paths
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem 
targetFs, CopyConfiguration copyConfig, boolean shouldIncludeMetadataPath) 
throws IOException {
+  protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem 
targetFs, CopyConfiguration copyConfig, boolean shouldIncludeMetadataPath)
+      throws IOException {
     IcebergTable icebergTable = this.getSrcIcebergTable();
     /** @return whether `pathStr` is present on `targetFs`, caching results 
while tunneling checked exceptions outward */
     Function<String, Boolean> isPresentOnTarget = 
CheckedExceptionFunction.wrapToTunneled(pathStr ->
@@ -188,11 +217,19 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
           this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
           currentSnapshotOverview.getManifestListPath(),
           currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: 
MISSING!>>"));
-      return Maps.newHashMap();
+      TableMetadata readTimeTableMetadata = 
currentSnapshotOverview.getTableMetadata().orElseThrow(() -> new 
RuntimeException(
+          String.format("~%s~ no table metadata for current snapshot '%s' at 
'%s' with metadata path '%s'",
+              this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
+              currentSnapshotOverview.getManifestListPath(),
+              currentSnapshotOverview.getMetadataPath().orElse("<<ERROR: 
MISSING!>>"))));
+      return new GetFilePathsToFileStatusResult(Maps.newHashMap(), 
readTimeTableMetadata);
     }
+
+    List<TableMetadata> readTimeTableMetadataHolder = Lists.newArrayList(); // 
expecting exactly one elem
     Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos = 
icebergTable.getIncrementalSnapshotInfosIterator();
     Iterator<String> filePathsIterator = Iterators.concat(
         Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
+          
snapshotInfo.getTableMetadata().ifPresent(readTimeTableMetadataHolder::add);
           // log each snapshot, for context, in case of 
`FileNotFoundException` during `FileSystem.getFileStatus()`
           String manListPath = snapshotInfo.getManifestListPath();
           log.info("~{}~ loaded snapshot '{}' at '{}' from metadata path: 
'{}'", this.getFileSetId(),
@@ -276,7 +313,20 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
     } catch (CheckedExceptionFunction.WrappedIOException wrapper) {
       wrapper.rethrowWrapped();
     }
-    return results;
+
+    if (readTimeTableMetadataHolder.size() != 1) {
+      final int firstNumToShow = 5;
+      String newline = System.lineSeparator();
+      String errMsg = readTimeTableMetadataHolder.size() == 0
+          ? String.format("~%s~ no table metadata ever encountered!", 
this.getFileSetId())
+          : String.format("~%s~ multiple metadata (%d) encountered (exactly 1 
expected) - first %d: [%s]",
+              this.getFileSetId(), readTimeTableMetadataHolder.size(), 
firstNumToShow,
+              
readTimeTableMetadataHolder.stream().limit(firstNumToShow).map(md ->
+                  md.uuid() + " - " + md.metadataFileLocation()
+              ).collect(Collectors.joining(newline, newline, newline)));
+      throw new RuntimeException(errMsg);
+    }
+    return new GetFilePathsToFileStatusResult(results, 
readTimeTableMetadataHolder.get(0));
   }
 
   /**
@@ -329,8 +379,10 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
     return this.destIcebergTable.getDatasetDescriptor(targetFs);
   }
 
-  private PostPublishStep createPostPublishStep() {
-    IcebergRegisterStep icebergRegisterStep = new 
IcebergRegisterStep(this.srcIcebergTable.getTableId(), 
this.destIcebergTable.getTableId(), this.properties);
+  private PostPublishStep createPostPublishStep(TableMetadata 
readTimeSrcTableMetadata, TableMetadata justPriorDestTableMetadata) {
+    // TODO: Filter properties specific to iceberg registration and avoid 
serializing every global property
+    IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(
+        this.srcIcebergTable.getTableId(), this.destIcebergTable.getTableId(), 
readTimeSrcTableMetadata, justPriorDestTableMetadata, this.properties);
     return new PostPublishStep(getFileSetId(), Maps.newHashMap(), 
icebergRegisterStep, 0);
   }
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index 080d8e91d..f6668f5d1 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -53,7 +53,7 @@ import org.apache.gobblin.util.HadoopUtils;
 public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDataset> {
   public static final String ICEBERG_DATASET_PREFIX = 
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
 
-  public static final String ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH = 
ICEBERG_DATASET_PREFIX + ".copy.metadata.path";
+  public static final String ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH = 
ICEBERG_DATASET_PREFIX + ".should.copy.metadata.path";
   public static final String DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH 
= "false";
 
   public static final String DEFAULT_ICEBERG_CATALOG_CLASS = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
index 8beb72e66..7345e9a00 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
@@ -29,19 +29,32 @@ import org.apache.gobblin.commit.CommitStep;
 
 
 /**
- * {@link CommitStep} to perform Iceberg registration.
+ * {@link CommitStep} to perform Iceberg registration.  It is critically 
important to use the same source-side {@link TableMetadata} observed while
+ * listing the source table and the dest-side {@link TableMetadata} observed 
just prior to that listing of the source table.  Either table may have
+ * changed between first calculating the source-to-dest difference and now 
performing the commit on the destination (herein).  Accordingly, use of
+ * now-current metadata could thwart consistency.  Only metadata preserved 
from the time of the difference calc guarantees correctness.
+ *
+ *   - if the source table has since changed, we nonetheless use the metadata 
originally observed, since now-current metadata wouldn't match the
+ *     files just copied to dest
+ *   - if the dest table has since changed, we reject the commit altogether to 
force the diff calc to re-start again (in a subsequent execution)
  */
 @Slf4j
 public class IcebergRegisterStep implements CommitStep {
 
   // store as string for serializability... TODO: explore whether truly 
necessary (or we could just as well store as `TableIdentifier`)
-  private final String srcTableIdStr;
+  private final String srcTableIdStr; // used merely for naming within trace 
logging
   private final String destTableIdStr;
+  private final TableMetadata readTimeSrcTableMetadata;
+  private final TableMetadata justPriorDestTableMetadata;
   private final Properties properties;
 
-  public IcebergRegisterStep(TableIdentifier srcTableId, TableIdentifier 
destTableId, Properties properties) {
+  public IcebergRegisterStep(TableIdentifier srcTableId, TableIdentifier 
destTableId,
+      TableMetadata readTimeSrcTableMetadata, TableMetadata 
justPriorDestTableMetadata,
+      Properties properties) {
     this.srcTableIdStr = srcTableId.toString();
     this.destTableIdStr = destTableId.toString();
+    this.readTimeSrcTableMetadata = readTimeSrcTableMetadata;
+    this.justPriorDestTableMetadata = justPriorDestTableMetadata;
     this.properties = properties;
   }
 
@@ -52,18 +65,24 @@ public class IcebergRegisterStep implements CommitStep {
 
   @Override
   public void execute() throws IOException {
-    IcebergTable srcIcebergTable = 
IcebergDatasetFinder.createIcebergCatalog(this.properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE)
-        .openTable(TableIdentifier.parse(srcTableIdStr));
     IcebergTable destIcebergTable = 
IcebergDatasetFinder.createIcebergCatalog(this.properties, 
IcebergDatasetFinder.CatalogLocation.DESTINATION)
         .openTable(TableIdentifier.parse(destTableIdStr));
-    TableMetadata destinationMetadata = null;
+    // NOTE: solely by-product of probing table's existence: metadata recorded 
just prior to reading from source catalog is what's actually used
+    TableMetadata unusedNowCurrentDestMetadata = null;
     try {
-      destinationMetadata = destIcebergTable.accessTableMetadata(); // 
probe... (first access could throw)
+      unusedNowCurrentDestMetadata = destIcebergTable.accessTableMetadata(); 
// probe... (first access could throw)
+      log.info("~{}~ (destination) (using) TableMetadata: {} - {} {}= 
(current) TableMetadata: {} - {}",
+          destTableIdStr,
+          justPriorDestTableMetadata.uuid(), 
justPriorDestTableMetadata.metadataFileLocation(),
+          
unusedNowCurrentDestMetadata.uuid().equals(justPriorDestTableMetadata.uuid()) ? 
"=" : "!",
+          unusedNowCurrentDestMetadata.uuid(), 
unusedNowCurrentDestMetadata.metadataFileLocation());
     } catch (IcebergTable.TableNotFoundException tnfe) {
       log.warn("Destination TableMetadata doesn't exist because: " , tnfe);
     }
-    
destIcebergTable.registerIcebergTable(srcIcebergTable.accessTableMetadata(), 
destinationMetadata);
+    // TODO: decide whether helpful to construct a more detailed error message 
about `justPriorDestTableMetadata` being no-longer current
+    destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata, 
justPriorDestTableMetadata);
   }
+
   @Override
   public String toString() {
     return String.format("Registering Iceberg Table: {%s} (dest); (src: 
{%s})", this.destTableIdStr, this.srcTableIdStr);
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
index dc97b9ed6..88c7f3ad5 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
@@ -27,6 +27,8 @@ import lombok.Data;
 
 import com.google.common.collect.Lists;
 
+import org.apache.iceberg.TableMetadata;
+
 
 /**
  * Information about the metadata file and data file paths of a single Iceberg 
Snapshot.
@@ -43,8 +45,10 @@ public class IcebergSnapshotInfo {
 
   private final Long snapshotId;
   private final Instant timestamp;
-  /** only for the current snapshot, being whom the metadata file 'belongs 
to'; `isEmpty()` for all other snapshots */
+  /** only for the snapshot designated 'current', being whom the metadata file 
'belongs to'; `isEmpty()` for all other snapshots */
   private final Optional<String> metadataPath;
+  /** only for the snapshot designated 'current', being whom the metadata 
'belong to'; `isEmpty()` for all other snapshots */
+  private final Optional<TableMetadata> tableMetadata;
   private final String manifestListPath;
   private final List<ManifestFileInfo> manifestFiles;
 
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index 245298bcb..1a56a8d93 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -86,13 +86,13 @@ public class IcebergTable {
   /** @return metadata info limited to the most recent (current) snapshot */
   public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
     TableMetadata current = accessTableMetadata();
-    return createSnapshotInfo(current.currentSnapshot(), 
Optional.of(current.metadataFileLocation()));
+    return createSnapshotInfo(current.currentSnapshot(), 
Optional.of(current.metadataFileLocation()), Optional.of(current));
   }
 
   /** @return metadata info for most recent snapshot, wherein manifests and 
their child data files ARE NOT listed */
   public IcebergSnapshotInfo getCurrentSnapshotInfoOverviewOnly() throws 
IOException {
     TableMetadata current = accessTableMetadata();
-    return createSnapshotInfo(current.currentSnapshot(), 
Optional.of(current.metadataFileLocation()), true);
+    return createSnapshotInfo(current.currentSnapshot(), 
Optional.of(current.metadataFileLocation()), Optional.of(current), true);
   }
 
   /** @return metadata info for all known snapshots, ordered historically, 
with *most recent last* */
@@ -104,7 +104,8 @@ public class IcebergTable {
       try {
         return IcebergTable.this.createSnapshotInfo(
             snapshot,
-            currentSnapshotId == snapshot.snapshotId() ? 
Optional.of(current.metadataFileLocation()) : Optional.empty()
+            currentSnapshotId == snapshot.snapshotId() ? 
Optional.of(current.metadataFileLocation()) : Optional.empty(),
+            currentSnapshotId == snapshot.snapshotId() ? Optional.of(current) 
: Optional.empty()
         );
       } catch (IOException e) {
         throw new RuntimeException(e);
@@ -160,16 +161,19 @@ public class IcebergTable {
     return Optional.ofNullable(current).orElseThrow(() -> new 
TableNotFoundException(this.tableId));
   }
 
-  protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, 
Optional<String> metadataFileLocation) throws IOException {
-    return createSnapshotInfo(snapshot, metadataFileLocation, false);
+  protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, 
Optional<String> metadataFileLocation, Optional<TableMetadata> 
currentTableMetadata)
+      throws IOException {
+    return createSnapshotInfo(snapshot, metadataFileLocation, 
currentTableMetadata, false);
   }
 
-  protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, 
Optional<String> metadataFileLocation, boolean skipManifestFileInfo) throws 
IOException {
+  protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot, 
Optional<String> metadataFileLocation, Optional<TableMetadata> 
currentTableMetadata,
+      boolean skipManifestFileInfo) throws IOException {
     // TODO: verify correctness, even when handling 'delete manifests'!
     return new IcebergSnapshotInfo(
         snapshot.snapshotId(),
         Instant.ofEpochMilli(snapshot.timestampMillis()),
         metadataFileLocation,
+        currentTableMetadata,
         snapshot.manifestListLocation(),
         // NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m, 
tableOps.io()))` due to checked exception
         skipManifestFileInfo ? Lists.newArrayList() : 
calcAllManifestFileInfos(snapshot.allManifests(tableOps.io()), tableOps.io())
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 1f23f7428..c5e20ed72 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -41,8 +41,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
@@ -398,7 +400,8 @@ public class IcebergDatasetTest {
     FileSystem destFs = destFsBuilder.build();
     CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs);
 
-    Map<Path, FileStatus> filePathsToFileStatus = 
icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration, 
shouldIncludeMetadataPath);
+    IcebergDataset.GetFilePathsToFileStatusResult pathsResult = 
icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration, 
shouldIncludeMetadataPath);
+    Map<Path, FileStatus> filePathsToFileStatus = 
pathsResult.getPathsToFileStatus();
     Assert.assertEquals(filePathsToFileStatus.keySet(), expectedResultPaths);
     // verify solely the path portion of the `FileStatus`, since that's all 
mock sets up
     Assert.assertEquals(
@@ -596,6 +599,8 @@ public class IcebergDatasetTest {
       private final String manifestListPath;
       private final List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles;
 
+      private static final TableMetadata unusedStubMetadata = 
Mockito.mock(TableMetadata.class);
+
       public IcebergSnapshotInfo asSnapshotInfo() {
         return asSnapshotInfo(0L);
       }
@@ -606,7 +611,9 @@ public class IcebergDatasetTest {
       }
 
       public IcebergSnapshotInfo asSnapshotInfo(Long snapshotId, Instant 
timestamp) {
-        return new IcebergSnapshotInfo(snapshotId, timestamp, 
this.metadataPath, this.manifestListPath, this.manifestFiles);
+        return new IcebergSnapshotInfo(snapshotId, timestamp, 
this.metadataPath,
+            this.metadataPath.map(ignore -> unusedStubMetadata), // only set 
when `metadataPath.isPresent()`
+            this.manifestListPath, this.manifestFiles);
       }
     }
 

Reply via email to