This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f6f1bc51d [GOBBLIN-1994] Ensure iceberg-distcp consistency by using
same `TableMetadata` for both WU planning and final commit (#3870)
f6f1bc51d is described below
commit f6f1bc51d3fc020add7e7a1ee6760432f197870d
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Jan 31 13:47:15 2024 -0800
[GOBBLIN-1994] Ensure iceberg-distcp consistency by using same
`TableMetadata` for both WU planning and final commit (#3870)
Ensure iceberg-distcp consistency by using same `TableMetadata` for both WU
planning and final commit
---
.../management/copy/iceberg/IcebergDataset.java | 72 +++++++++++++++++++---
.../copy/iceberg/IcebergDatasetFinder.java | 2 +-
.../copy/iceberg/IcebergRegisterStep.java | 35 ++++++++---
.../copy/iceberg/IcebergSnapshotInfo.java | 6 +-
.../data/management/copy/iceberg/IcebergTable.java | 16 +++--
.../copy/iceberg/IcebergDatasetTest.java | 13 +++-
6 files changed, 115 insertions(+), 29 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
index dbd4f780a..9e6364c32 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -40,9 +41,12 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import javax.annotation.concurrent.NotThreadSafe;
+import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.iceberg.TableMetadata;
+
import org.apache.gobblin.data.management.copy.CopyConfiguration;
import org.apache.gobblin.data.management.copy.CopyEntity;
import org.apache.gobblin.data.management.copy.CopyableDataset;
@@ -101,6 +105,7 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
public Iterator<FileSet<CopyEntity>> getFileSetIterator(FileSystem targetFs,
CopyConfiguration configuration) {
return createFileSets(targetFs, configuration);
}
+
/**
* Finds all files read by the table and generates CopyableFiles.
* For the specific semantics see {@link #createFileSets}.
@@ -126,6 +131,14 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
return Iterators.singletonIterator(fileSet);
}
+ /** Conveys {@link Path}-to-{@link FileStatus} mapping in combination with
{@link TableMetadata} originating from the same atomic read of those paths */
+ @Data
+ @VisibleForTesting
+ protected static final class GetFilePathsToFileStatusResult {
+ private final Map<Path, FileStatus> pathsToFileStatus;
+ private final TableMetadata tableMetadata;
+ }
+
/**
* Finds all files, data and metadata, as {@link CopyEntity}s that comprise
the table and fully specify remaining
* table replication.
@@ -134,7 +147,9 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
Collection<CopyEntity> generateCopyEntities(FileSystem targetFs,
CopyConfiguration copyConfig) throws IOException {
String fileSet = this.getFileSetId();
List<CopyEntity> copyEntities = Lists.newArrayList();
- Map<Path, FileStatus> pathToFileStatus =
getFilePathsToFileStatus(targetFs, copyConfig, this.shouldIncludeMetadataPath);
+ TableMetadata destTableMetadataBeforeSrcRead =
getCurrentDestTableMetadata();
+ GetFilePathsToFileStatusResult atomicGetPathsResult =
getFilePathsToFileStatus(targetFs, copyConfig, this.shouldIncludeMetadataPath);
+ Map<Path, FileStatus> pathToFileStatus =
atomicGetPathsResult.getPathsToFileStatus();
log.info("~{}~ found {} candidate source paths", fileSet,
pathToFileStatus.size());
Configuration defaultHadoopConfiguration = new Configuration();
@@ -159,18 +174,32 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
fileEntity.setDestinationData(getDestinationDataset(targetFs));
copyEntities.add(fileEntity);
}
- // TODO: Filter properties specific to iceberg registration and avoid
serializing every global property
- copyEntities.add(createPostPublishStep());
+
+
copyEntities.add(createPostPublishStep(atomicGetPathsResult.getTableMetadata(),
destTableMetadataBeforeSrcRead));
log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
return copyEntities;
}
/**
- * Finds all files of the Iceberg's current snapshot
+ * @return {@link TableMetadata} current at destination
+ * @throws {@link IOException} if `this.destIcebergTable` does not exist
+ */
+ protected TableMetadata getCurrentDestTableMetadata() throws IOException {
+ try {
+ return destIcebergTable.accessTableMetadata();
+ } catch (IcebergTable.TableNotFoundException tnfe) {
+ log.error("No destination TableMetadata because table not found: '" +
destIcebergTable.getTableId() + "'" , tnfe);
+ throw new IOException(tnfe);
+ }
+ }
+
+ /**
+ * Finds all files of the Iceberg's current snapshot and also returns the
{@link TableMetadata} atomically accessed while reading those paths
* @param shouldIncludeMetadataPath whether to consider "metadata.json"
(`getMetadataPath()`) as eligible for inclusion
- * @return a map of path, file status for each file that needs to be copied
+ * @return combined result: both a map of path to file status for each file
to copy plus the {@link TableMetadata}, atomically observed with those paths
*/
- protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem
targetFs, CopyConfiguration copyConfig, boolean shouldIncludeMetadataPath)
throws IOException {
+ protected GetFilePathsToFileStatusResult getFilePathsToFileStatus(FileSystem
targetFs, CopyConfiguration copyConfig, boolean shouldIncludeMetadataPath)
+ throws IOException {
IcebergTable icebergTable = this.getSrcIcebergTable();
/** @return whether `pathStr` is present on `targetFs`, caching results
while tunneling checked exceptions outward */
Function<String, Boolean> isPresentOnTarget =
CheckedExceptionFunction.wrapToTunneled(pathStr ->
@@ -188,11 +217,19 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
currentSnapshotOverview.getManifestListPath(),
currentSnapshotOverview.getMetadataPath().orElse("<<ERROR:
MISSING!>>"));
- return Maps.newHashMap();
+ TableMetadata readTimeTableMetadata =
currentSnapshotOverview.getTableMetadata().orElseThrow(() -> new
RuntimeException(
+ String.format("~%s~ no table metadata for current snapshot '%s' at
'%s' with metadata path '%s'",
+ this.getFileSetId(), currentSnapshotOverview.getSnapshotId(),
+ currentSnapshotOverview.getManifestListPath(),
+ currentSnapshotOverview.getMetadataPath().orElse("<<ERROR:
MISSING!>>"))));
+ return new GetFilePathsToFileStatusResult(Maps.newHashMap(),
readTimeTableMetadata);
}
+
+ List<TableMetadata> readTimeTableMetadataHolder = Lists.newArrayList(); //
expecting exactly one elem
Iterator<IcebergSnapshotInfo> icebergIncrementalSnapshotInfos =
icebergTable.getIncrementalSnapshotInfosIterator();
Iterator<String> filePathsIterator = Iterators.concat(
Iterators.transform(icebergIncrementalSnapshotInfos, snapshotInfo -> {
+
snapshotInfo.getTableMetadata().ifPresent(readTimeTableMetadataHolder::add);
// log each snapshot, for context, in case of
`FileNotFoundException` during `FileSystem.getFileStatus()`
String manListPath = snapshotInfo.getManifestListPath();
log.info("~{}~ loaded snapshot '{}' at '{}' from metadata path:
'{}'", this.getFileSetId(),
@@ -276,7 +313,20 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
} catch (CheckedExceptionFunction.WrappedIOException wrapper) {
wrapper.rethrowWrapped();
}
- return results;
+
+ if (readTimeTableMetadataHolder.size() != 1) {
+ final int firstNumToShow = 5;
+ String newline = System.lineSeparator();
+ String errMsg = readTimeTableMetadataHolder.size() == 0
+ ? String.format("~%s~ no table metadata ever encountered!",
this.getFileSetId())
+ : String.format("~%s~ multiple metadata (%d) encountered (exactly 1
expected) - first %d: [%s]",
+ this.getFileSetId(), readTimeTableMetadataHolder.size(),
firstNumToShow,
+
readTimeTableMetadataHolder.stream().limit(firstNumToShow).map(md ->
+ md.uuid() + " - " + md.metadataFileLocation()
+ ).collect(Collectors.joining(newline, newline, newline)));
+ throw new RuntimeException(errMsg);
+ }
+ return new GetFilePathsToFileStatusResult(results,
readTimeTableMetadataHolder.get(0));
}
/**
@@ -329,8 +379,10 @@ public class IcebergDataset implements
PrioritizedCopyableDataset {
return this.destIcebergTable.getDatasetDescriptor(targetFs);
}
- private PostPublishStep createPostPublishStep() {
- IcebergRegisterStep icebergRegisterStep = new
IcebergRegisterStep(this.srcIcebergTable.getTableId(),
this.destIcebergTable.getTableId(), this.properties);
+ private PostPublishStep createPostPublishStep(TableMetadata
readTimeSrcTableMetadata, TableMetadata justPriorDestTableMetadata) {
+ // TODO: Filter properties specific to iceberg registration and avoid
serializing every global property
+ IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(
+ this.srcIcebergTable.getTableId(), this.destIcebergTable.getTableId(),
readTimeSrcTableMetadata, justPriorDestTableMetadata, this.properties);
return new PostPublishStep(getFileSetId(), Maps.newHashMap(),
icebergRegisterStep, 0);
}
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index 080d8e91d..f6668f5d1 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -53,7 +53,7 @@ import org.apache.gobblin.util.HadoopUtils;
public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDataset> {
public static final String ICEBERG_DATASET_PREFIX =
DatasetConstants.PLATFORM_ICEBERG + ".dataset";
- public static final String ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH =
ICEBERG_DATASET_PREFIX + ".copy.metadata.path";
+ public static final String ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH =
ICEBERG_DATASET_PREFIX + ".should.copy.metadata.path";
public static final String DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH
= "false";
public static final String DEFAULT_ICEBERG_CATALOG_CLASS =
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
index 8beb72e66..7345e9a00 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java
@@ -29,19 +29,32 @@ import org.apache.gobblin.commit.CommitStep;
/**
- * {@link CommitStep} to perform Iceberg registration.
+ * {@link CommitStep} to perform Iceberg registration. It is critically
important to use the same source-side {@link TableMetadata} observed while
+ * listing the source table and the dest-side {@link TableMetadata} observed
just prior to that listing of the source table. Either table may have
+ * changed between first calculating the source-to-dest difference and now
performing the commit on the destination (herein). Accordingly, use of
+ * now-current metadata could thwart consistency. Only metadata preserved
from the time of the difference calc guarantees correctness.
+ *
+ * - if the source table has since changed, we nonetheless use the metadata
originally observed, since now-current metadata wouldn't match the
+ * files just copied to dest
+ * - if the dest table has since changed, we reject the commit altogether to
force the diff calc to re-start again (in a subsequent execution)
*/
@Slf4j
public class IcebergRegisterStep implements CommitStep {
// store as string for serializability... TODO: explore whether truly
necessary (or we could just as well store as `TableIdentifier`)
- private final String srcTableIdStr;
+ private final String srcTableIdStr; // used merely for naming within trace
logging
private final String destTableIdStr;
+ private final TableMetadata readTimeSrcTableMetadata;
+ private final TableMetadata justPriorDestTableMetadata;
private final Properties properties;
- public IcebergRegisterStep(TableIdentifier srcTableId, TableIdentifier
destTableId, Properties properties) {
+ public IcebergRegisterStep(TableIdentifier srcTableId, TableIdentifier
destTableId,
+ TableMetadata readTimeSrcTableMetadata, TableMetadata
justPriorDestTableMetadata,
+ Properties properties) {
this.srcTableIdStr = srcTableId.toString();
this.destTableIdStr = destTableId.toString();
+ this.readTimeSrcTableMetadata = readTimeSrcTableMetadata;
+ this.justPriorDestTableMetadata = justPriorDestTableMetadata;
this.properties = properties;
}
@@ -52,18 +65,24 @@ public class IcebergRegisterStep implements CommitStep {
@Override
public void execute() throws IOException {
- IcebergTable srcIcebergTable =
IcebergDatasetFinder.createIcebergCatalog(this.properties,
IcebergDatasetFinder.CatalogLocation.SOURCE)
- .openTable(TableIdentifier.parse(srcTableIdStr));
IcebergTable destIcebergTable =
IcebergDatasetFinder.createIcebergCatalog(this.properties,
IcebergDatasetFinder.CatalogLocation.DESTINATION)
.openTable(TableIdentifier.parse(destTableIdStr));
- TableMetadata destinationMetadata = null;
+ // NOTE: solely by-product of probing table's existence: metadata recorded
just prior to reading from source catalog is what's actually used
+ TableMetadata unusedNowCurrentDestMetadata = null;
try {
- destinationMetadata = destIcebergTable.accessTableMetadata(); //
probe... (first access could throw)
+ unusedNowCurrentDestMetadata = destIcebergTable.accessTableMetadata();
// probe... (first access could throw)
+ log.info("~{}~ (destination) (using) TableMetadata: {} - {} {}=
(current) TableMetadata: {} - {}",
+ destTableIdStr,
+ justPriorDestTableMetadata.uuid(),
justPriorDestTableMetadata.metadataFileLocation(),
+
unusedNowCurrentDestMetadata.uuid().equals(justPriorDestTableMetadata.uuid()) ?
"=" : "!",
+ unusedNowCurrentDestMetadata.uuid(),
unusedNowCurrentDestMetadata.metadataFileLocation());
} catch (IcebergTable.TableNotFoundException tnfe) {
log.warn("Destination TableMetadata doesn't exist because: " , tnfe);
}
-
destIcebergTable.registerIcebergTable(srcIcebergTable.accessTableMetadata(),
destinationMetadata);
+ // TODO: decide whether helpful to construct a more detailed error message
about `justPriorDestTableMetadata` being no-longer current
+ destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata,
justPriorDestTableMetadata);
}
+
@Override
public String toString() {
return String.format("Registering Iceberg Table: {%s} (dest); (src:
{%s})", this.destTableIdStr, this.srcTableIdStr);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
index dc97b9ed6..88c7f3ad5 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSnapshotInfo.java
@@ -27,6 +27,8 @@ import lombok.Data;
import com.google.common.collect.Lists;
+import org.apache.iceberg.TableMetadata;
+
/**
* Information about the metadata file and data file paths of a single Iceberg
Snapshot.
@@ -43,8 +45,10 @@ public class IcebergSnapshotInfo {
private final Long snapshotId;
private final Instant timestamp;
- /** only for the current snapshot, being whom the metadata file 'belongs
to'; `isEmpty()` for all other snapshots */
+ /** only for the snapshot designated 'current', being whom the metadata file
'belongs to'; `isEmpty()` for all other snapshots */
private final Optional<String> metadataPath;
+ /** only for the snapshot designated 'current', being whom the metadata
'belong to'; `isEmpty()` for all other snapshots */
+ private final Optional<TableMetadata> tableMetadata;
private final String manifestListPath;
private final List<ManifestFileInfo> manifestFiles;
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index 245298bcb..1a56a8d93 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -86,13 +86,13 @@ public class IcebergTable {
/** @return metadata info limited to the most recent (current) snapshot */
public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException {
TableMetadata current = accessTableMetadata();
- return createSnapshotInfo(current.currentSnapshot(),
Optional.of(current.metadataFileLocation()));
+ return createSnapshotInfo(current.currentSnapshot(),
Optional.of(current.metadataFileLocation()), Optional.of(current));
}
/** @return metadata info for most recent snapshot, wherein manifests and
their child data files ARE NOT listed */
public IcebergSnapshotInfo getCurrentSnapshotInfoOverviewOnly() throws
IOException {
TableMetadata current = accessTableMetadata();
- return createSnapshotInfo(current.currentSnapshot(),
Optional.of(current.metadataFileLocation()), true);
+ return createSnapshotInfo(current.currentSnapshot(),
Optional.of(current.metadataFileLocation()), Optional.of(current), true);
}
/** @return metadata info for all known snapshots, ordered historically,
with *most recent last* */
@@ -104,7 +104,8 @@ public class IcebergTable {
try {
return IcebergTable.this.createSnapshotInfo(
snapshot,
- currentSnapshotId == snapshot.snapshotId() ?
Optional.of(current.metadataFileLocation()) : Optional.empty()
+ currentSnapshotId == snapshot.snapshotId() ?
Optional.of(current.metadataFileLocation()) : Optional.empty(),
+ currentSnapshotId == snapshot.snapshotId() ? Optional.of(current)
: Optional.empty()
);
} catch (IOException e) {
throw new RuntimeException(e);
@@ -160,16 +161,19 @@ public class IcebergTable {
return Optional.ofNullable(current).orElseThrow(() -> new
TableNotFoundException(this.tableId));
}
- protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot,
Optional<String> metadataFileLocation) throws IOException {
- return createSnapshotInfo(snapshot, metadataFileLocation, false);
+ protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot,
Optional<String> metadataFileLocation, Optional<TableMetadata>
currentTableMetadata)
+ throws IOException {
+ return createSnapshotInfo(snapshot, metadataFileLocation,
currentTableMetadata, false);
}
- protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot,
Optional<String> metadataFileLocation, boolean skipManifestFileInfo) throws
IOException {
+ protected IcebergSnapshotInfo createSnapshotInfo(Snapshot snapshot,
Optional<String> metadataFileLocation, Optional<TableMetadata>
currentTableMetadata,
+ boolean skipManifestFileInfo) throws IOException {
// TODO: verify correctness, even when handling 'delete manifests'!
return new IcebergSnapshotInfo(
snapshot.snapshotId(),
Instant.ofEpochMilli(snapshot.timestampMillis()),
metadataFileLocation,
+ currentTableMetadata,
snapshot.manifestListLocation(),
// NOTE: unable to `.stream().map(m -> calcManifestFileInfo(m,
tableOps.io()))` due to checked exception
skipManifestFileInfo ? Lists.newArrayList() :
calcAllManifestFileInfos(snapshot.allManifests(tableOps.io()), tableOps.io())
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index 1f23f7428..c5e20ed72 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -41,8 +41,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.TableIdentifier;
+
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
@@ -398,7 +400,8 @@ public class IcebergDatasetTest {
FileSystem destFs = destFsBuilder.build();
CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs);
- Map<Path, FileStatus> filePathsToFileStatus =
icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration,
shouldIncludeMetadataPath);
+ IcebergDataset.GetFilePathsToFileStatusResult pathsResult =
icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration,
shouldIncludeMetadataPath);
+ Map<Path, FileStatus> filePathsToFileStatus =
pathsResult.getPathsToFileStatus();
Assert.assertEquals(filePathsToFileStatus.keySet(), expectedResultPaths);
// verify solely the path portion of the `FileStatus`, since that's all
mock sets up
Assert.assertEquals(
@@ -596,6 +599,8 @@ public class IcebergDatasetTest {
private final String manifestListPath;
private final List<IcebergSnapshotInfo.ManifestFileInfo> manifestFiles;
+ private static final TableMetadata unusedStubMetadata =
Mockito.mock(TableMetadata.class);
+
public IcebergSnapshotInfo asSnapshotInfo() {
return asSnapshotInfo(0L);
}
@@ -606,7 +611,9 @@ public class IcebergDatasetTest {
}
public IcebergSnapshotInfo asSnapshotInfo(Long snapshotId, Instant
timestamp) {
- return new IcebergSnapshotInfo(snapshotId, timestamp,
this.metadataPath, this.manifestListPath, this.manifestFiles);
+ return new IcebergSnapshotInfo(snapshotId, timestamp,
this.metadataPath,
+ this.metadataPath.map(ignore -> unusedStubMetadata), // only set
when `metadataPath.isPresent()`
+ this.manifestListPath, this.manifestFiles);
}
}