This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3061c7048 Add config for excluding 'manifest.json' in iceberg-distcp 
data movement (#3869)
3061c7048 is described below

commit 3061c7048840fab0eccff2a42ef303b500e0f20c
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Jan 29 13:27:02 2024 -0800

    Add config for excluding 'manifest.json' in iceberg-distcp data movement 
(#3869)
---
 .../management/copy/iceberg/IcebergDataset.java    | 28 ++++---
 .../copy/iceberg/IcebergDatasetFinder.java         | 12 ++-
 .../copy/iceberg/IcebergSnapshotInfo.java          | 10 +--
 .../copy/iceberg/IcebergDatasetTest.java           | 91 ++++++++++++++--------
 .../management/copy/iceberg/IcebergTableTest.java  |  8 +-
 .../service/modules/orchestration/DagManager.java  |  2 +-
 .../modules/orchestration/FlowTriggerHandler.java  |  7 +-
 7 files changed, 102 insertions(+), 56 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index 67a10a42d..dbd4f780a 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -68,16 +68,15 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
   private final IcebergTable destIcebergTable;
   protected final Properties properties;
   protected final FileSystem sourceFs;
+  protected final boolean shouldIncludeMetadataPath;
   private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make 
parameterizable, if desired
 
-  /** Destination database name */
-  public static final String DESTINATION_DATABASE_KEY = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".destination.database";
-
-  public IcebergDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties, FileSystem sourceFs) {
+  public IcebergDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties, FileSystem sourceFs, boolean 
shouldIncludeMetadataPath) {
     this.srcIcebergTable = srcIcebergTable;
     this.destIcebergTable = destIcebergTable;
     this.properties = properties;
     this.sourceFs = sourceFs;
+    this.shouldIncludeMetadataPath = shouldIncludeMetadataPath;
   }
 
   @Override
@@ -135,7 +134,7 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
   Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, 
CopyConfiguration copyConfig) throws IOException {
     String fileSet = this.getFileSetId();
     List<CopyEntity> copyEntities = Lists.newArrayList();
-    Map<Path, FileStatus> pathToFileStatus = 
getFilePathsToFileStatus(targetFs, copyConfig);
+    Map<Path, FileStatus> pathToFileStatus = 
getFilePathsToFileStatus(targetFs, copyConfig, this.shouldIncludeMetadataPath);
     log.info("~{}~ found {} candidate source paths", fileSet, 
pathToFileStatus.size());
 
     Configuration defaultHadoopConfiguration = new Configuration();
@@ -168,9 +167,10 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
 
   /**
    * Finds all files of the Iceberg's current snapshot
+   * @param shouldIncludeMetadataPath whether to consider "metadata.json" 
(`getMetadataPath()`) as eligible for inclusion
    * @return a map of path, file status for each file that needs to be copied
    */
-  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem 
targetFs, CopyConfiguration copyConfig) throws IOException {
+  protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem 
targetFs, CopyConfiguration copyConfig, boolean shouldIncludeMetadataPath) 
throws IOException {
     IcebergTable icebergTable = this.getSrcIcebergTable();
     /** @return whether `pathStr` is present on `targetFs`, caching results 
while tunneling checked exceptions outward */
     Function<String, Boolean> isPresentOnTarget = 
CheckedExceptionFunction.wrapToTunneled(pathStr ->
@@ -180,6 +180,7 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
     );
 
     // check first for case of nothing to replicate, to avoid needless 
scanning of a potentially massive iceberg
+    // NOTE: if `shouldIncludeMetadataPath` was false during the prior 
executions, this condition will be false
     IcebergSnapshotInfo currentSnapshotOverview = 
icebergTable.getCurrentSnapshotInfoOverviewOnly();
     if 
(currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false) 
&&
         
isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) {
@@ -207,7 +208,7 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
           // some of the children pointed to within could have been copied 
prior, when they previously appeared as a
           // child of the current file's predecessor (which this new meta file 
now replaces).
           if (!isPresentOnTarget.apply(manListPath)) {
-            List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+            List<String> missingPaths = 
snapshotInfo.getSnapshotApexPaths(shouldIncludeMetadataPath);
             for (IcebergSnapshotInfo.ManifestFileInfo mfi : 
snapshotInfo.getManifestFiles()) {
               if (!isPresentOnTarget.apply(mfi.getManifestFilePath())) {
                 missingPaths.add(mfi.getManifestFilePath());
@@ -226,13 +227,20 @@ public class IcebergDataset implements 
PrioritizedCopyableDataset {
           } else {
             log.info("~{}~ snapshot '{}' already present on target... skipping 
(including contents)",
                 this.getFileSetId(), snapshotInfo.getSnapshotId());
-            // IMPORTANT: separately consider metadata path, to handle case of 
'metadata-only' snapshot reusing mf-list
+            // IMPORTANT: separately consider metadata path, to handle case of 
'metadata-only' snapshot reusing manifest list
             Optional<String> metadataPath = snapshotInfo.getMetadataPath();
             Optional<String> nonReplicatedMetadataPath = metadataPath.filter(p 
-> !isPresentOnTarget.apply(p));
             metadataPath.ifPresent(ignore ->
-                log.info("~{}~ metadata IS {} already present on target", 
this.getFileSetId(), nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO")
+                log.info("~{}~ metadata IS {} present on target",
+                    this.getFileSetId(),
+                    !nonReplicatedMetadataPath.isPresent()
+                        ? "already"
+                        : shouldIncludeMetadataPath ? "NOT YET" : "NOT CHOSEN 
TO BE")
             );
-            return nonReplicatedMetadataPath.map(p -> 
Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
+            return nonReplicatedMetadataPath
+                .filter(ignore -> shouldIncludeMetadataPath)
+                .map(p -> Lists.newArrayList(p).iterator())
+                .orElse(Collections.emptyIterator());
           }
         })
     );
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index dc407f38c..080d8e91d 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
+import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigValue;
 
@@ -52,6 +52,10 @@ import org.apache.gobblin.util.HadoopUtils;
 @RequiredArgsConstructor
 public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDataset> {
   public static final String ICEBERG_DATASET_PREFIX = 
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
+
+  public static final String ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH = 
ICEBERG_DATASET_PREFIX + ".copy.metadata.path";
+  public static final String DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH 
= "false";
+
   public static final String DEFAULT_ICEBERG_CATALOG_CLASS = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
   public static final String ICEBERG_CATALOG_KEY = "catalog";
   /**
@@ -149,7 +153,7 @@ public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDatase
     IcebergTable destIcebergTable = 
destinationIcebergCatalog.openTable(destDbName, destTableName);
     // TODO: Rethink strategy to enforce dest iceberg table
     
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
 String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, 
destTableName));
-    return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, 
fs);
+    return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, 
fs, getConfigShouldCopyMetadataPath(properties));
   }
 
   protected static IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
@@ -161,6 +165,10 @@ public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDatase
     return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
   }
 
+  protected static boolean getConfigShouldCopyMetadataPath(Properties 
properties) {
+    return 
Boolean.valueOf(properties.getProperty(ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH,
 DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH));
+  }
+
   /** @return property value or `null` */
   protected static String getLocationQualifiedProperty(Properties properties, 
CatalogLocation location, String relativePropName) {
     return properties.getProperty(calcLocationQualifiedPropName(location, 
relativePropName));
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
index 42b871302..dc97b9ed6 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
@@ -56,15 +56,15 @@ public class IcebergSnapshotInfo {
     return 
manifestFiles.stream().map(ManifestFileInfo::getListedFilePaths).flatMap(List::stream).collect(Collectors.toList());
   }
 
-  /** @return the `manifestListPath` and `metadataPath`, if present */
-  public List<String> getSnapshotApexPaths() {
-    List<String> result = 
metadataPath.map(Lists::newArrayList).orElse(Lists.newArrayList());
+  /** @return the `manifestListPath` and, if `shouldIncludeMetadataPath` and 
it is present, also `metadataPath` */
+  public List<String> getSnapshotApexPaths(boolean shouldIncludeMetadataPath) {
+    List<String> result = metadataPath.map(Lists::newArrayList).filter(ignore 
-> shouldIncludeMetadataPath).orElse(Lists.newArrayList());
     result.add(manifestListPath);
     return result;
   }
 
-  public List<String> getAllPaths() {
-    List<String> result = getSnapshotApexPaths();
+  public List<String> getAllPaths(boolean shouldIncludeMetadataPath) {
+    List<String> result = getSnapshotApexPaths(shouldIncludeMetadataPath);
     result.addAll(getManifestFilePaths());
     result.addAll(getAllDataFilePaths());
     return result;
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index a005eb315..1f23f7428 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -51,6 +51,7 @@ import org.testng.collections.Sets;
 
 import com.google.api.client.util.Maps;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Streams;
 import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
@@ -130,36 +131,40 @@ public class IcebergDatasetTest {
   public void testGetFilePathsWhenDestEmpty() throws IOException {
     List<MockIcebergTable.SnapshotPaths> icebergSnapshots = 
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
     List<String> existingDestPaths = Lists.newArrayList();
-    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), 
SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
-    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, 
expectedResultPaths);
+    boolean shouldIncludeMetadataPath = true;
+    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), 
shouldIncludeMetadataPath, SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, 
expectedResultPaths, shouldIncludeMetadataPath);
   }
 
   @Test
   public void testGetFilePathsWhenOneManifestListAtDest() throws IOException {
     List<MockIcebergTable.SnapshotPaths> icebergSnapshots = 
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
     List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1);
-    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), 
SNAPSHOT_PATHS_0);
-    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, 
expectedResultPaths);
+    boolean shouldIncludeMetadataPath = true;
+    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), 
shouldIncludeMetadataPath, SNAPSHOT_PATHS_0);
+    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, 
expectedResultPaths, shouldIncludeMetadataPath);
   }
 
   @Test
   public void testGetFilePathsWhenOneManifestAtDest() throws IOException {
     List<MockIcebergTable.SnapshotPaths> icebergSnapshots = 
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
     List<String> existingDestPaths = Lists.newArrayList(MANIFEST_PATH_1);
-    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), 
SNAPSHOT_PATHS_0);
+    boolean shouldIncludeMetadataPath = false;
+    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), 
shouldIncludeMetadataPath, SNAPSHOT_PATHS_0);
     expectedResultPaths.add(new Path(MANIFEST_LIST_PATH_1)); // expect 
manifest's parent, despite manifest subtree skip
-    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, 
expectedResultPaths);
+    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, 
expectedResultPaths, shouldIncludeMetadataPath);
   }
 
   @Test
   public void testGetFilePathsWhenSomeDataFilesAtDest() throws IOException {
     List<MockIcebergTable.SnapshotPaths> icebergSnapshots = 
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
     List<String> existingDestPaths = Lists.newArrayList(MANIFEST_DATA_PATH_1B, 
MANIFEST_DATA_PATH_0A);
-    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), 
SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    boolean shouldIncludeMetadataPath = true;
+    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), 
shouldIncludeMetadataPath, SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
     // despite already existing on target, expect anyway: per-file check 
skipped for optimization's sake
     // expectedResultPaths.remove(new Path(MANIFEST_DATA_PATH_1B));
     // expectedResultPaths.remove(new Path(MANIFEST_DATA_PATH_0A));
-    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, 
expectedResultPaths);
+    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, 
expectedResultPaths, shouldIncludeMetadataPath);
   }
 
   @Test
@@ -167,32 +172,44 @@ public class IcebergDatasetTest {
     List<MockIcebergTable.SnapshotPaths> icebergSnapshots = 
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
     // pretend this path doesn't exist on source:
     Path missingPath = new Path(MANIFEST_DATA_PATH_0A);
-    Set<Path> existingSourcePaths = withAllSnapshotPaths(Sets.newHashSet(), 
SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    boolean shouldIncludeMetadataPath = false;
+    Set<Path> existingSourcePaths = withAllSnapshotPaths(Sets.newHashSet(), 
shouldIncludeMetadataPath, SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
     existingSourcePaths.remove(missingPath);
     List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1);
-    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), 
SNAPSHOT_PATHS_0);
+    Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(), 
shouldIncludeMetadataPath, SNAPSHOT_PATHS_0);
     expectedResultPaths.remove(missingPath);
     validateGetFilePathsGivenDestState(icebergSnapshots,
         
Optional.of(existingSourcePaths.stream().map(Path::toString).collect(Collectors.toList())),
 existingDestPaths,
-        expectedResultPaths);
+        expectedResultPaths, shouldIncludeMetadataPath);
   }
 
   @Test
   public void testGetFilePathsWhenManifestListsAtDestButNotMetadata() throws 
IOException {
     List<MockIcebergTable.SnapshotPaths> icebergSnapshots = 
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
     List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1, 
MANIFEST_LIST_PATH_0);
+    boolean shouldIncludeMetadataPath = true;
     Set<Path> expectedResultPaths = Sets.newHashSet();
     expectedResultPaths.add(new Path(METADATA_PATH));
-    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, 
expectedResultPaths);
+    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, 
expectedResultPaths, shouldIncludeMetadataPath);
+  }
+
+  @Test
+  public void 
testGetFilePathsWhenManifestListsAtDestButNotMetadataYetThatIgnored() throws 
IOException {
+    List<MockIcebergTable.SnapshotPaths> icebergSnapshots = 
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+    List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1, 
MANIFEST_LIST_PATH_0);
+    boolean shouldIncludeMetadataPath = false;
+    Set<Path> expectedResultPaths = Sets.newHashSet(); // nothing expected!
+    validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths, 
expectedResultPaths, shouldIncludeMetadataPath);
   }
 
   @Test
   public void testGetFilePathsWhenAllAtDest() throws IOException {
     List<MockIcebergTable.SnapshotPaths> icebergSnapshots = 
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
     List<String> existingDestPaths = Lists.newArrayList(METADATA_PATH, 
MANIFEST_LIST_PATH_1, MANIFEST_LIST_PATH_0);
+    boolean shouldIncludeMetadataPath = true;
     Set<Path> expectedResultPaths = Sets.newHashSet(); // not expecting any 
delta
     IcebergTable mockTable =
-        validateGetFilePathsGivenDestState(icebergSnapshots, 
existingDestPaths, expectedResultPaths);
+        validateGetFilePathsGivenDestState(icebergSnapshots, 
existingDestPaths, expectedResultPaths, shouldIncludeMetadataPath);
     // ensure short-circuiting was able to avert iceberg manifests scan
     Mockito.verify(mockTable, 
Mockito.times(1)).getCurrentSnapshotInfoOverviewOnly();
     Mockito.verify(mockTable, Mockito.times(1)).getTableId();
@@ -206,13 +223,14 @@ public class IcebergDatasetTest {
 
     MockFileSystemBuilder sourceFsBuilder = new 
MockFileSystemBuilder(SRC_FS_URI);
     FileSystem sourceFs = sourceFsBuilder.build();
-    IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, 
new Properties(), sourceFs);
+    boolean shouldIncludeMetadataPathMakesNoDifference = true;
+    IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, 
new Properties(), sourceFs, shouldIncludeMetadataPathMakesNoDifference);
 
     MockFileSystemBuilder destFsBuilder = new 
MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destFsBuilder.build();
     Mockito.doThrow(new IOException("Ha - not so 
fast!")).when(destFs).getFileStatus(new 
Path(SNAPSHOT_PATHS_0.manifestListPath));
     CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs);
-    icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration);
+    icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration, 
shouldIncludeMetadataPathMakesNoDifference);
   }
 
   /** Validate error consolidation used to streamline logging. */
@@ -247,7 +265,8 @@ public class IcebergDatasetTest {
     TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName, 
testTblName);
     IcebergTable srcIcebergTbl = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_0));
     IcebergTable destIcebergTbl = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_1));
-    IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTbl, 
destIcebergTbl, new Properties(), sourceFs);
+    boolean shouldIncludeManifestPath = true;
+    IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTbl, 
destIcebergTbl, new Properties(), sourceFs, shouldIncludeManifestPath);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
@@ -262,7 +281,7 @@ public class IcebergDatasetTest {
   /** Test generating copy entities for a multi-snapshot iceberg; given empty 
dest, src-dest delta will be entirety */
   @Test
   public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws 
IOException {
-    List<String> expectedPaths = Arrays.asList(METADATA_PATH,
+    List<String> expectedPaths = Arrays.asList( // METADATA_PATH,
         MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A, 
MANIFEST_DATA_PATH_0B,
         MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A, 
MANIFEST_DATA_PATH_1B);
 
@@ -273,7 +292,8 @@ public class IcebergDatasetTest {
     TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName, 
testTblName);
     IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1, 
SNAPSHOT_PATHS_0));
     IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_1));
-    IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs);
+    boolean shouldIncludeManifestPath = false;
+    IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs, shouldIncludeManifestPath);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
@@ -304,7 +324,8 @@ public class IcebergDatasetTest {
     TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName, 
testTblName);
     IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_0));
     IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_1));
-    IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs);
+    boolean shouldIncludeManifestPath = true;
+    IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs, shouldIncludeManifestPath);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
@@ -333,7 +354,8 @@ public class IcebergDatasetTest {
     TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName, 
testTblName);
     IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_0));
     IcebergTable destIcebergTable = 
MockIcebergTable.withSnapshots(tableIdInCommon, 
Arrays.asList(SNAPSHOT_PATHS_1));
-    IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs);
+    boolean shouldIncludeManifestPath = true;
+    IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable, 
destIcebergTable, new Properties(), sourceFs, shouldIncludeManifestPath);
 
     MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
     FileSystem destFs = destBuilder.build();
@@ -352,9 +374,9 @@ public class IcebergDatasetTest {
    *  @return {@link IcebergTable} (mock!), for behavioral verification
    */
   protected IcebergTable 
validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> 
sourceSnapshotPathSets,
-      List<String> existingDestPaths, Set<Path> expectedResultPaths) throws 
IOException {
+      List<String> existingDestPaths, Set<Path> expectedResultPaths, boolean 
shouldIncludeMetadataPath) throws IOException {
     return validateGetFilePathsGivenDestState(sourceSnapshotPathSets, 
Optional.empty(), existingDestPaths,
-        expectedResultPaths);
+        expectedResultPaths, shouldIncludeMetadataPath);
   }
 
   /**
@@ -362,20 +384,21 @@ public class IcebergDatasetTest {
    *  @return {@link IcebergTable} (mock!), for behavioral verification
    */
   protected IcebergTable 
validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths> 
sourceSnapshotPathSets,
-      Optional<List<String>> optExistingSourcePaths, List<String> 
existingDestPaths, Set<Path> expectedResultPaths) throws IOException {
+      Optional<List<String>> optExistingSourcePaths, List<String> 
existingDestPaths, Set<Path> expectedResultPaths,
+      boolean shouldIncludeMetadataPath) throws IOException {
     IcebergTable srcIcebergTable = 
MockIcebergTable.withSnapshots(TableIdentifier.of(testDbName, testTblName), 
sourceSnapshotPathSets);
 
     MockFileSystemBuilder sourceFsBuilder = new 
MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent());
     optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths);
     FileSystem sourceFs = sourceFsBuilder.build();
-    IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, 
new Properties(), sourceFs);
+    IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null, 
new Properties(), sourceFs, shouldIncludeMetadataPath);
 
     MockFileSystemBuilder destFsBuilder = new 
MockFileSystemBuilder(DEST_FS_URI);
     destFsBuilder.addPaths(existingDestPaths);
     FileSystem destFs = destFsBuilder.build();
     CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs);
 
-    Map<Path, FileStatus> filePathsToFileStatus = 
icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration);
+    Map<Path, FileStatus> filePathsToFileStatus = 
icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration, 
shouldIncludeMetadataPath);
     Assert.assertEquals(filePathsToFileStatus.keySet(), expectedResultPaths);
     // verify solely the path portion of the `FileStatus`, since that's all 
mock sets up
     Assert.assertEquals(
@@ -385,9 +408,9 @@ public class IcebergDatasetTest {
   }
 
   /** @return `paths` after adding to it all paths of every one of 
`snapshotDefs` */
-  protected static Set<Path> withAllSnapshotPaths(Set<Path> paths, 
MockIcebergTable.SnapshotPaths... snapshotDefs) {
+  protected static Set<Path> withAllSnapshotPaths(Set<Path> paths, boolean 
shouldIncludeMetadataPath, MockIcebergTable.SnapshotPaths... snapshotDefs) {
     Arrays.stream(snapshotDefs).flatMap(snapshotDef ->
-            snapshotDef.asSnapshotInfo().getAllPaths().stream())
+            
snapshotDef.asSnapshotInfo().getAllPaths(shouldIncludeMetadataPath).stream())
         .forEach(p ->
             paths.add(new Path(p))
         );
@@ -464,16 +487,18 @@ public class IcebergDatasetTest {
   }
 
   /**
-   *  Sadly, this is needed to avoid losing `FileSystem` mock to replacement 
from the `FileSystem.get` `static`
-   *  Without this, so to lose the mock, we'd be unable to set up any source 
paths as existing.
+   *  Without this, our {@link FileSystem} mock would be lost by replacement 
from the {@link FileSystem#get} static, and
+   *  that would prevent tests from effectively setting up certain source 
paths as existing.
+   *  Instead override {@link 
IcebergDataset#getSourceFileSystemFromFileStatus(FileStatus, Configuration)} so 
that static
+   *  is avoided entirely.
    */
   protected static class TrickIcebergDataset extends IcebergDataset {
     public TrickIcebergDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties,
-        FileSystem sourceFs) {
-      super(srcIcebergTable, destIcebergTable, properties, sourceFs);
+        FileSystem sourceFs, boolean shouldIncludeManifestPath) {
+      super(srcIcebergTable, destIcebergTable, properties, sourceFs, 
shouldIncludeManifestPath);
     }
 
-    @Override // as the `static` is not mock-able
+    @Override // as the `static` itself is not directly mock-able
     protected FileSystem getSourceFileSystemFromFileStatus(FileStatus 
fileStatus, Configuration hadoopConfig) throws IOException {
       return this.sourceFs;
     }
@@ -604,7 +629,7 @@ public class IcebergDatasetTest {
     /** @return {@link Stream} equivalent of `inputs.zipWithIndex.map(f)` in 
scala */
     public static <T, R> Stream<R> transformWithIndex(Stream<T> inputs, 
BiFunction<T, Integer, R> f) {
       // given sketchy import, sequester for now within enclosing test class, 
rather than adding to `gobblin-utility`
-      return 
org.apache.iceberg.relocated.com.google.common.collect.Streams.zip(
+      return Streams.zip(
           inputs, IntStream.iterate(0, i -> i + 1).boxed(), f);
     }
   }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 20ea30610..096962320 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -199,11 +199,15 @@ public class IcebergTableTest extends HiveMetastoreTest {
     verifyManifestFiles(manifestFileInfos, 
snapshotInfo.getManifestFilePaths(), perSnapshotFilesets);
     verifyAnyOrder(snapshotInfo.getAllDataFilePaths(), 
flatten(perSnapshotFilesets), "data filepaths");
     // verify all aforementioned paths collectively equal `getAllPaths()`
+    boolean shouldIncludeMetadataPath = false;
     List<String> allPathsExpected = 
Lists.newArrayList(snapshotInfo.getManifestListPath());
-    snapshotInfo.getMetadataPath().ifPresent(allPathsExpected::add);
     allPathsExpected.addAll(snapshotInfo.getManifestFilePaths());
     allPathsExpected.addAll(snapshotInfo.getAllDataFilePaths());
-    verifyAnyOrder(snapshotInfo.getAllPaths(), allPathsExpected, "all paths, 
metadata and data");
+    verifyAnyOrder(snapshotInfo.getAllPaths(shouldIncludeMetadataPath), 
allPathsExpected, "all paths, metadata and data, except metadataPath itself");
+
+    boolean shouldIncludeMetadataPathIfAvailable = true;
+    snapshotInfo.getMetadataPath().ifPresent(allPathsExpected::add);
+    
verifyAnyOrder(snapshotInfo.getAllPaths(shouldIncludeMetadataPathIfAvailable), 
allPathsExpected, "all paths, metadata and data, including metadataPath");
   }
 
   protected String calcMetadataBasePath(TableIdentifier tableId) {
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 9e345d71a..e0559c4c7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -672,7 +672,7 @@ public class DagManager extends AbstractIdleService {
       log.info("Cancel flow with DagId {}", dagToCancel);
       if (this.dagToJobs.containsKey(dagToCancel)) {
         List<DagNode<JobExecutionPlan>> dagNodesToCancel = 
this.dagToJobs.get(dagToCancel);
-        log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
+        log.info("Found {} DagNodes to cancel (DagId {}).", 
dagNodesToCancel.size(), dagToCancel);
         for (DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
           cancelDagNode(dagNodeToCancel);
         }
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index f0be63366..04ca6f597 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -58,9 +58,10 @@ import org.quartz.impl.JobDetailImpl;
  * for a flow action event. After acquiring the lease, it persists the flow 
action event to the {@link DagActionStore}
  * to be eventually acted upon by the host with the active DagManager. Once it 
has completed this action, it will mark
  * the lease as completed by calling the
- * {@link MysqlMultiActiveLeaseArbiter.recordLeaseSuccess()} method. Hosts 
that do not gain the lease for the event,
- * instead schedule a reminder using the {@link SchedulerService} to check 
back in on the previous lease owner's
- * completion status after the lease should expire to ensure the event is 
handled in failure cases.
+ * {@link 
MysqlMultiActiveLeaseArbiter#recordLeaseSuccess(MultiActiveLeaseArbiter.LeaseObtainedStatus)}
 method. Hosts
+ * that do not gain the lease for the event, instead schedule a reminder using 
the {@link SchedulerService} to check
+ * back in on the previous lease owner's completion status after the lease 
should expire to ensure the event is handled
+ * in failure cases.
  */
 @Slf4j
 public class FlowTriggerHandler {

Reply via email to