This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3061c7048 Add config for excluding 'manifest.json' in iceberg-distcp
data movement (#3869)
3061c7048 is described below
commit 3061c7048840fab0eccff2a42ef303b500e0f20c
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Jan 29 13:27:02 2024 -0800
Add config for excluding 'manifest.json' in iceberg-distcp data movement
(#3869)
---
.../management/copy/iceberg/IcebergDataset.java | 28 ++++---
.../copy/iceberg/IcebergDatasetFinder.java | 12 ++-
.../copy/iceberg/IcebergSnapshotInfo.java | 10 +--
.../copy/iceberg/IcebergDatasetTest.java | 91 ++++++++++++++--------
.../management/copy/iceberg/IcebergTableTest.java | 8 +-
.../service/modules/orchestration/DagManager.java | 2 +-
.../modules/orchestration/FlowTriggerHandler.java | 7 +-
7 files changed, 102 insertions(+), 56 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index 67a10a42d..dbd4f780a 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -68,16 +68,15 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
private final IcebergTable destIcebergTable;
protected final Properties properties;
protected final FileSystem sourceFs;
+ protected final boolean shouldIncludeMetadataPath;
private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make
parameterizable, if desired
- /** Destination database name */
- public static final String DESTINATION_DATABASE_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".destination.database";
-
- public IcebergDataset(IcebergTable srcIcebergTable, IcebergTable
destIcebergTable, Properties properties, FileSystem sourceFs) {
+ public IcebergDataset(IcebergTable srcIcebergTable, IcebergTable
destIcebergTable, Properties properties, FileSystem sourceFs, boolean
shouldIncludeMetadataPath) {
this.srcIcebergTable = srcIcebergTable;
this.destIcebergTable = destIcebergTable;
this.properties = properties;
this.sourceFs = sourceFs;
+ this.shouldIncludeMetadataPath = shouldIncludeMetadataPath;
}
@Override
@@ -135,7 +134,7 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
Collection<CopyEntity> generateCopyEntities(FileSystem targetFs,
CopyConfiguration copyConfig) throws IOException {
String fileSet = this.getFileSetId();
List<CopyEntity> copyEntities = Lists.newArrayList();
- Map<Path, FileStatus> pathToFileStatus =
getFilePathsToFileStatus(targetFs, copyConfig);
+ Map<Path, FileStatus> pathToFileStatus =
getFilePathsToFileStatus(targetFs, copyConfig, this.shouldIncludeMetadataPath);
log.info("~{}~ found {} candidate source paths", fileSet,
pathToFileStatus.size());
Configuration defaultHadoopConfiguration = new Configuration();
@@ -168,9 +167,10 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
/**
* Finds all files of the Iceberg's current snapshot
+ * @param shouldIncludeMetadataPath whether to consider "metadata.json"
(`getMetadataPath()`) as eligible for inclusion
* @return a map of path, file status for each file that needs to be copied
*/
- protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem
targetFs, CopyConfiguration copyConfig) throws IOException {
+ protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem
targetFs, CopyConfiguration copyConfig, boolean shouldIncludeMetadataPath)
throws IOException {
IcebergTable icebergTable = this.getSrcIcebergTable();
/** @return whether `pathStr` is present on `targetFs`, caching results
while tunneling checked exceptions outward */
Function<String, Boolean> isPresentOnTarget =
CheckedExceptionFunction.wrapToTunneled(pathStr ->
@@ -180,6 +180,7 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
);
// check first for case of nothing to replicate, to avoid needless
scanning of a potentially massive iceberg
+ // NOTE: if `shouldIncludeMetadataPath` was false during the prior
executions, this condition will be false
IcebergSnapshotInfo currentSnapshotOverview =
icebergTable.getCurrentSnapshotInfoOverviewOnly();
if
(currentSnapshotOverview.getMetadataPath().map(isPresentOnTarget).orElse(false)
&&
isPresentOnTarget.apply(currentSnapshotOverview.getManifestListPath())) {
@@ -207,7 +208,7 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
// some of the children pointed to within could have been copied
prior, when they previously appeared as a
// child of the current file's predecessor (which this new meta file
now replaces).
if (!isPresentOnTarget.apply(manListPath)) {
- List<String> missingPaths = snapshotInfo.getSnapshotApexPaths();
+ List<String> missingPaths =
snapshotInfo.getSnapshotApexPaths(shouldIncludeMetadataPath);
for (IcebergSnapshotInfo.ManifestFileInfo mfi :
snapshotInfo.getManifestFiles()) {
if (!isPresentOnTarget.apply(mfi.getManifestFilePath())) {
missingPaths.add(mfi.getManifestFilePath());
@@ -226,13 +227,20 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
} else {
log.info("~{}~ snapshot '{}' already present on target... skipping
(including contents)",
this.getFileSetId(), snapshotInfo.getSnapshotId());
- // IMPORTANT: separately consider metadata path, to handle case of
'metadata-only' snapshot reusing mf-list
+ // IMPORTANT: separately consider metadata path, to handle case of
'metadata-only' snapshot reusing manifest list
Optional<String> metadataPath = snapshotInfo.getMetadataPath();
Optional<String> nonReplicatedMetadataPath = metadataPath.filter(p
-> !isPresentOnTarget.apply(p));
metadataPath.ifPresent(ignore ->
- log.info("~{}~ metadata IS {} already present on target",
this.getFileSetId(), nonReplicatedMetadataPath.isPresent() ? "NOT" : "ALSO")
+ log.info("~{}~ metadata IS {} present on target",
+ this.getFileSetId(),
+ !nonReplicatedMetadataPath.isPresent()
+ ? "already"
+ : shouldIncludeMetadataPath ? "NOT YET" : "NOT CHOSEN
TO BE")
);
- return nonReplicatedMetadataPath.map(p ->
Lists.newArrayList(p).iterator()).orElse(Collections.emptyIterator());
+ return nonReplicatedMetadataPath
+ .filter(ignore -> shouldIncludeMetadataPath)
+ .map(p -> Lists.newArrayList(p).iterator())
+ .orElse(Collections.emptyIterator());
}
})
);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index dc407f38c..080d8e91d 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
@@ -52,6 +52,10 @@ import org.apache.gobblin.util.HadoopUtils;
@RequiredArgsConstructor
public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
public static final String ICEBERG_DATASET_PREFIX =
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
+
+ public static final String ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH =
ICEBERG_DATASET_PREFIX + ".copy.metadata.path";
+ public static final String DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH
= "false";
+
public static final String DEFAULT_ICEBERG_CATALOG_CLASS =
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
public static final String ICEBERG_CATALOG_KEY = "catalog";
/**
@@ -149,7 +153,7 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
IcebergTable destIcebergTable =
destinationIcebergCatalog.openTable(destDbName, destTableName);
// TODO: Rethink strategy to enforce dest iceberg table
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName,
destTableName));
- return new IcebergDataset(srcIcebergTable, destIcebergTable, properties,
fs);
+ return new IcebergDataset(srcIcebergTable, destIcebergTable, properties,
fs, getConfigShouldCopyMetadataPath(properties));
}
protected static IcebergCatalog createIcebergCatalog(Properties properties,
CatalogLocation location) throws IOException {
@@ -161,6 +165,10 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
return IcebergCatalogFactory.create(icebergCatalogClassName,
catalogProperties, configuration);
}
+ protected static boolean getConfigShouldCopyMetadataPath(Properties
properties) {
+ return
Boolean.valueOf(properties.getProperty(ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH,
DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH));
+ }
+
/** @return property value or `null` */
protected static String getLocationQualifiedProperty(Properties properties,
CatalogLocation location, String relativePropName) {
return properties.getProperty(calcLocationQualifiedPropName(location,
relativePropName));
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
index 42b871302..dc97b9ed6 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
@@ -56,15 +56,15 @@ public class IcebergSnapshotInfo {
return
manifestFiles.stream().map(ManifestFileInfo::getListedFilePaths).flatMap(List::stream).collect(Collectors.toList());
}
- /** @return the `manifestListPath` and `metadataPath`, if present */
- public List<String> getSnapshotApexPaths() {
- List<String> result =
metadataPath.map(Lists::newArrayList).orElse(Lists.newArrayList());
+ /** @return the `manifestListPath` and, if `shouldIncludeMetadataPath` and
it is present, also `metadataPath` */
+ public List<String> getSnapshotApexPaths(boolean shouldIncludeMetadataPath) {
+ List<String> result = metadataPath.map(Lists::newArrayList).filter(ignore
-> shouldIncludeMetadataPath).orElse(Lists.newArrayList());
result.add(manifestListPath);
return result;
}
- public List<String> getAllPaths() {
- List<String> result = getSnapshotApexPaths();
+ public List<String> getAllPaths(boolean shouldIncludeMetadataPath) {
+ List<String> result = getSnapshotApexPaths(shouldIncludeMetadataPath);
result.addAll(getManifestFilePaths());
result.addAll(getAllDataFilePaths());
return result;
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index a005eb315..1f23f7428 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -51,6 +51,7 @@ import org.testng.collections.Sets;
import com.google.api.client.util.Maps;
import com.google.common.collect.Lists;
+import com.google.common.collect.Streams;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
@@ -130,36 +131,40 @@ public class IcebergDatasetTest {
public void testGetFilePathsWhenDestEmpty() throws IOException {
List<MockIcebergTable.SnapshotPaths> icebergSnapshots =
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
List<String> existingDestPaths = Lists.newArrayList();
- Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(),
SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
- validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths,
expectedResultPaths);
+ boolean shouldIncludeMetadataPath = true;
+ Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(),
shouldIncludeMetadataPath, SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths,
expectedResultPaths, shouldIncludeMetadataPath);
}
@Test
public void testGetFilePathsWhenOneManifestListAtDest() throws IOException {
List<MockIcebergTable.SnapshotPaths> icebergSnapshots =
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1);
- Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(),
SNAPSHOT_PATHS_0);
- validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths,
expectedResultPaths);
+ boolean shouldIncludeMetadataPath = true;
+ Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(),
shouldIncludeMetadataPath, SNAPSHOT_PATHS_0);
+ validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths,
expectedResultPaths, shouldIncludeMetadataPath);
}
@Test
public void testGetFilePathsWhenOneManifestAtDest() throws IOException {
List<MockIcebergTable.SnapshotPaths> icebergSnapshots =
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
List<String> existingDestPaths = Lists.newArrayList(MANIFEST_PATH_1);
- Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(),
SNAPSHOT_PATHS_0);
+ boolean shouldIncludeMetadataPath = false;
+ Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(),
shouldIncludeMetadataPath, SNAPSHOT_PATHS_0);
expectedResultPaths.add(new Path(MANIFEST_LIST_PATH_1)); // expect
manifest's parent, despite manifest subtree skip
- validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths,
expectedResultPaths);
+ validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths,
expectedResultPaths, shouldIncludeMetadataPath);
}
@Test
public void testGetFilePathsWhenSomeDataFilesAtDest() throws IOException {
List<MockIcebergTable.SnapshotPaths> icebergSnapshots =
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
List<String> existingDestPaths = Lists.newArrayList(MANIFEST_DATA_PATH_1B,
MANIFEST_DATA_PATH_0A);
- Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(),
SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ boolean shouldIncludeMetadataPath = true;
+ Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(),
shouldIncludeMetadataPath, SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
// despite already existing on target, expect anyway: per-file check
skipped for optimization's sake
// expectedResultPaths.remove(new Path(MANIFEST_DATA_PATH_1B));
// expectedResultPaths.remove(new Path(MANIFEST_DATA_PATH_0A));
- validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths,
expectedResultPaths);
+ validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths,
expectedResultPaths, shouldIncludeMetadataPath);
}
@Test
@@ -167,32 +172,44 @@ public class IcebergDatasetTest {
List<MockIcebergTable.SnapshotPaths> icebergSnapshots =
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
// pretend this path doesn't exist on source:
Path missingPath = new Path(MANIFEST_DATA_PATH_0A);
- Set<Path> existingSourcePaths = withAllSnapshotPaths(Sets.newHashSet(),
SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ boolean shouldIncludeMetadataPath = false;
+ Set<Path> existingSourcePaths = withAllSnapshotPaths(Sets.newHashSet(),
shouldIncludeMetadataPath, SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
existingSourcePaths.remove(missingPath);
List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1);
- Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(),
SNAPSHOT_PATHS_0);
+ Set<Path> expectedResultPaths = withAllSnapshotPaths(Sets.newHashSet(),
shouldIncludeMetadataPath, SNAPSHOT_PATHS_0);
expectedResultPaths.remove(missingPath);
validateGetFilePathsGivenDestState(icebergSnapshots,
Optional.of(existingSourcePaths.stream().map(Path::toString).collect(Collectors.toList())),
existingDestPaths,
- expectedResultPaths);
+ expectedResultPaths, shouldIncludeMetadataPath);
}
@Test
public void testGetFilePathsWhenManifestListsAtDestButNotMetadata() throws
IOException {
List<MockIcebergTable.SnapshotPaths> icebergSnapshots =
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1,
MANIFEST_LIST_PATH_0);
+ boolean shouldIncludeMetadataPath = true;
Set<Path> expectedResultPaths = Sets.newHashSet();
expectedResultPaths.add(new Path(METADATA_PATH));
- validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths,
expectedResultPaths);
+ validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths,
expectedResultPaths, shouldIncludeMetadataPath);
+ }
+
+ @Test
+ public void
testGetFilePathsWhenManifestListsAtDestButNotMetadataYetThatIgnored() throws
IOException {
+ List<MockIcebergTable.SnapshotPaths> icebergSnapshots =
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
+ List<String> existingDestPaths = Lists.newArrayList(MANIFEST_LIST_PATH_1,
MANIFEST_LIST_PATH_0);
+ boolean shouldIncludeMetadataPath = false;
+ Set<Path> expectedResultPaths = Sets.newHashSet(); // nothing expected!
+ validateGetFilePathsGivenDestState(icebergSnapshots, existingDestPaths,
expectedResultPaths, shouldIncludeMetadataPath);
}
@Test
public void testGetFilePathsWhenAllAtDest() throws IOException {
List<MockIcebergTable.SnapshotPaths> icebergSnapshots =
Lists.newArrayList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0);
List<String> existingDestPaths = Lists.newArrayList(METADATA_PATH,
MANIFEST_LIST_PATH_1, MANIFEST_LIST_PATH_0);
+ boolean shouldIncludeMetadataPath = true;
Set<Path> expectedResultPaths = Sets.newHashSet(); // not expecting any
delta
IcebergTable mockTable =
- validateGetFilePathsGivenDestState(icebergSnapshots,
existingDestPaths, expectedResultPaths);
+ validateGetFilePathsGivenDestState(icebergSnapshots,
existingDestPaths, expectedResultPaths, shouldIncludeMetadataPath);
// ensure short-circuiting was able to avert iceberg manifests scan
Mockito.verify(mockTable,
Mockito.times(1)).getCurrentSnapshotInfoOverviewOnly();
Mockito.verify(mockTable, Mockito.times(1)).getTableId();
@@ -206,13 +223,14 @@ public class IcebergDatasetTest {
MockFileSystemBuilder sourceFsBuilder = new
MockFileSystemBuilder(SRC_FS_URI);
FileSystem sourceFs = sourceFsBuilder.build();
- IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null,
new Properties(), sourceFs);
+ boolean shouldIncludeMetadataPathMakesNoDifference = true;
+ IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null,
new Properties(), sourceFs, shouldIncludeMetadataPathMakesNoDifference);
MockFileSystemBuilder destFsBuilder = new
MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destFsBuilder.build();
Mockito.doThrow(new IOException("Ha - not so
fast!")).when(destFs).getFileStatus(new
Path(SNAPSHOT_PATHS_0.manifestListPath));
CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs);
- icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration);
+ icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration,
shouldIncludeMetadataPathMakesNoDifference);
}
/** Validate error consolidation used to streamline logging. */
@@ -247,7 +265,8 @@ public class IcebergDatasetTest {
TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName,
testTblName);
IcebergTable srcIcebergTbl =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_0));
IcebergTable destIcebergTbl =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_1));
- IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTbl,
destIcebergTbl, new Properties(), sourceFs);
+ boolean shouldIncludeManifestPath = true;
+ IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTbl,
destIcebergTbl, new Properties(), sourceFs, shouldIncludeManifestPath);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
@@ -262,7 +281,7 @@ public class IcebergDatasetTest {
/** Test generating copy entities for a multi-snapshot iceberg; given empty
dest, src-dest delta will be entirety */
@Test
public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws
IOException {
- List<String> expectedPaths = Arrays.asList(METADATA_PATH,
+ List<String> expectedPaths = Arrays.asList( // METADATA_PATH,
MANIFEST_LIST_PATH_0, MANIFEST_PATH_0, MANIFEST_DATA_PATH_0A,
MANIFEST_DATA_PATH_0B,
MANIFEST_LIST_PATH_1, MANIFEST_PATH_1, MANIFEST_DATA_PATH_1A,
MANIFEST_DATA_PATH_1B);
@@ -273,7 +292,8 @@ public class IcebergDatasetTest {
TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName,
testTblName);
IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(tableIdInCommon, Arrays.asList(SNAPSHOT_PATHS_1,
SNAPSHOT_PATHS_0));
IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_1));
- IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable,
destIcebergTable, new Properties(), sourceFs);
+ boolean shouldIncludeManifestPath = false;
+ IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable,
destIcebergTable, new Properties(), sourceFs, shouldIncludeManifestPath);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
@@ -304,7 +324,8 @@ public class IcebergDatasetTest {
TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName,
testTblName);
IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_0));
IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_1));
- IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable,
destIcebergTable, new Properties(), sourceFs);
+ boolean shouldIncludeManifestPath = true;
+ IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable,
destIcebergTable, new Properties(), sourceFs, shouldIncludeManifestPath);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
@@ -333,7 +354,8 @@ public class IcebergDatasetTest {
TableIdentifier tableIdInCommon = TableIdentifier.of(testDbName,
testTblName);
IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_0));
IcebergTable destIcebergTable =
MockIcebergTable.withSnapshots(tableIdInCommon,
Arrays.asList(SNAPSHOT_PATHS_1));
- IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable,
destIcebergTable, new Properties(), sourceFs);
+ boolean shouldIncludeManifestPath = true;
+ IcebergDataset icebergDataset = new TrickIcebergDataset(srcIcebergTable,
destIcebergTable, new Properties(), sourceFs, shouldIncludeManifestPath);
MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI);
FileSystem destFs = destBuilder.build();
@@ -352,9 +374,9 @@ public class IcebergDatasetTest {
* @return {@link IcebergTable} (mock!), for behavioral verification
*/
protected IcebergTable
validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths>
sourceSnapshotPathSets,
- List<String> existingDestPaths, Set<Path> expectedResultPaths) throws
IOException {
+ List<String> existingDestPaths, Set<Path> expectedResultPaths, boolean
shouldIncludeMetadataPath) throws IOException {
return validateGetFilePathsGivenDestState(sourceSnapshotPathSets,
Optional.empty(), existingDestPaths,
- expectedResultPaths);
+ expectedResultPaths, shouldIncludeMetadataPath);
}
/**
@@ -362,20 +384,21 @@ public class IcebergDatasetTest {
* @return {@link IcebergTable} (mock!), for behavioral verification
*/
protected IcebergTable
validateGetFilePathsGivenDestState(List<MockIcebergTable.SnapshotPaths>
sourceSnapshotPathSets,
- Optional<List<String>> optExistingSourcePaths, List<String>
existingDestPaths, Set<Path> expectedResultPaths) throws IOException {
+ Optional<List<String>> optExistingSourcePaths, List<String>
existingDestPaths, Set<Path> expectedResultPaths,
+ boolean shouldIncludeMetadataPath) throws IOException {
IcebergTable srcIcebergTable =
MockIcebergTable.withSnapshots(TableIdentifier.of(testDbName, testTblName),
sourceSnapshotPathSets);
MockFileSystemBuilder sourceFsBuilder = new
MockFileSystemBuilder(SRC_FS_URI, !optExistingSourcePaths.isPresent());
optExistingSourcePaths.ifPresent(sourceFsBuilder::addPaths);
FileSystem sourceFs = sourceFsBuilder.build();
- IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null,
new Properties(), sourceFs);
+ IcebergDataset icebergDataset = new IcebergDataset(srcIcebergTable, null,
new Properties(), sourceFs, shouldIncludeMetadataPath);
MockFileSystemBuilder destFsBuilder = new
MockFileSystemBuilder(DEST_FS_URI);
destFsBuilder.addPaths(existingDestPaths);
FileSystem destFs = destFsBuilder.build();
CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs);
- Map<Path, FileStatus> filePathsToFileStatus =
icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration);
+ Map<Path, FileStatus> filePathsToFileStatus =
icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration,
shouldIncludeMetadataPath);
Assert.assertEquals(filePathsToFileStatus.keySet(), expectedResultPaths);
// verify solely the path portion of the `FileStatus`, since that's all
mock sets up
Assert.assertEquals(
@@ -385,9 +408,9 @@ public class IcebergDatasetTest {
}
/** @return `paths` after adding to it all paths of every one of
`snapshotDefs` */
- protected static Set<Path> withAllSnapshotPaths(Set<Path> paths,
MockIcebergTable.SnapshotPaths... snapshotDefs) {
+ protected static Set<Path> withAllSnapshotPaths(Set<Path> paths, boolean
shouldIncludeMetadataPath, MockIcebergTable.SnapshotPaths... snapshotDefs) {
Arrays.stream(snapshotDefs).flatMap(snapshotDef ->
- snapshotDef.asSnapshotInfo().getAllPaths().stream())
+
snapshotDef.asSnapshotInfo().getAllPaths(shouldIncludeMetadataPath).stream())
.forEach(p ->
paths.add(new Path(p))
);
@@ -464,16 +487,18 @@ public class IcebergDatasetTest {
}
/**
- * Sadly, this is needed to avoid losing `FileSystem` mock to replacement
from the `FileSystem.get` `static`
- * Without this, so to lose the mock, we'd be unable to set up any source
paths as existing.
+ * Without this, our {@link FileSystem} mock would be lost by replacement
from the {@link FileSystem#get} static, and
+ * that would prevent tests from effectively setting up certain source
paths as existing.
+ * Instead override {@link
IcebergDataset#getSourceFileSystemFromFileStatus(FileStatus, Configuration)} so
that static
+ * is avoided entirely.
*/
protected static class TrickIcebergDataset extends IcebergDataset {
public TrickIcebergDataset(IcebergTable srcIcebergTable, IcebergTable
destIcebergTable, Properties properties,
- FileSystem sourceFs) {
- super(srcIcebergTable, destIcebergTable, properties, sourceFs);
+ FileSystem sourceFs, boolean shouldIncludeManifestPath) {
+ super(srcIcebergTable, destIcebergTable, properties, sourceFs,
shouldIncludeManifestPath);
}
- @Override // as the `static` is not mock-able
+ @Override // as the `static` itself is not directly mock-able
protected FileSystem getSourceFileSystemFromFileStatus(FileStatus
fileStatus, Configuration hadoopConfig) throws IOException {
return this.sourceFs;
}
@@ -604,7 +629,7 @@ public class IcebergDatasetTest {
/** @return {@link Stream} equivalent of `inputs.zipWithIndex.map(f)` in
scala */
public static <T, R> Stream<R> transformWithIndex(Stream<T> inputs,
BiFunction<T, Integer, R> f) {
// given sketchy import, sequester for now within enclosing test class,
rather than adding to `gobblin-utility`
- return
org.apache.iceberg.relocated.com.google.common.collect.Streams.zip(
+ return Streams.zip(
inputs, IntStream.iterate(0, i -> i + 1).boxed(), f);
}
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index 20ea30610..096962320 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -199,11 +199,15 @@ public class IcebergTableTest extends HiveMetastoreTest {
verifyManifestFiles(manifestFileInfos,
snapshotInfo.getManifestFilePaths(), perSnapshotFilesets);
verifyAnyOrder(snapshotInfo.getAllDataFilePaths(),
flatten(perSnapshotFilesets), "data filepaths");
// verify all aforementioned paths collectively equal `getAllPaths()`
+ boolean shouldIncludeMetadataPath = false;
List<String> allPathsExpected =
Lists.newArrayList(snapshotInfo.getManifestListPath());
- snapshotInfo.getMetadataPath().ifPresent(allPathsExpected::add);
allPathsExpected.addAll(snapshotInfo.getManifestFilePaths());
allPathsExpected.addAll(snapshotInfo.getAllDataFilePaths());
- verifyAnyOrder(snapshotInfo.getAllPaths(), allPathsExpected, "all paths,
metadata and data");
+ verifyAnyOrder(snapshotInfo.getAllPaths(shouldIncludeMetadataPath),
allPathsExpected, "all paths, metadata and data, except metadataPath itself");
+
+ boolean shouldIncludeMetadataPathIfAvailable = true;
+ snapshotInfo.getMetadataPath().ifPresent(allPathsExpected::add);
+
verifyAnyOrder(snapshotInfo.getAllPaths(shouldIncludeMetadataPathIfAvailable),
allPathsExpected, "all paths, metadata and data, including metadataPath");
}
protected String calcMetadataBasePath(TableIdentifier tableId) {
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index 9e345d71a..e0559c4c7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -672,7 +672,7 @@ public class DagManager extends AbstractIdleService {
log.info("Cancel flow with DagId {}", dagToCancel);
if (this.dagToJobs.containsKey(dagToCancel)) {
List<DagNode<JobExecutionPlan>> dagNodesToCancel =
this.dagToJobs.get(dagToCancel);
- log.info("Found {} DagNodes to cancel.", dagNodesToCancel.size());
+ log.info("Found {} DagNodes to cancel (DagId {}).",
dagNodesToCancel.size(), dagToCancel);
for (DagNode<JobExecutionPlan> dagNodeToCancel : dagNodesToCancel) {
cancelDagNode(dagNodeToCancel);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
index f0be63366..04ca6f597 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowTriggerHandler.java
@@ -58,9 +58,10 @@ import org.quartz.impl.JobDetailImpl;
* for a flow action event. After acquiring the lease, it persists the flow
action event to the {@link DagActionStore}
* to be eventually acted upon by the host with the active DagManager. Once it
has completed this action, it will mark
* the lease as completed by calling the
- * {@link MysqlMultiActiveLeaseArbiter.recordLeaseSuccess()} method. Hosts
that do not gain the lease for the event,
- * instead schedule a reminder using the {@link SchedulerService} to check
back in on the previous lease owner's
- * completion status after the lease should expire to ensure the event is
handled in failure cases.
+ * {@link
MysqlMultiActiveLeaseArbiter#recordLeaseSuccess(MultiActiveLeaseArbiter.LeaseObtainedStatus)}
method. Hosts
+ * that do not gain the lease for the event, instead schedule a reminder using
the {@link SchedulerService} to check
+ * back in on the previous lease owner's completion status after the lease
should expire to ensure the event is handled
+ * in failure cases.
*/
@Slf4j
public class FlowTriggerHandler {