This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5ad93db8dc Construct iceberg data files during commit step (#4140)
5ad93db8dc is described below
commit 5ad93db8dcefb859934ef3a31a88738a14a95b1b
Author: thisisArjit <[email protected]>
AuthorDate: Mon Sep 8 15:57:07 2025 +0530
Construct iceberg data files during commit step (#4140)
---
.../iceberg/IcebergOverwritePartitionsStep.java | 20 +---
.../copy/iceberg/IcebergPartitionCopyableFile.java | 59 ++++++++++
.../copy/iceberg/IcebergPartitionDataset.java | 123 +++++++++------------
.../copy/publisher/CopyDataPublisher.java | 25 ++++-
.../copy/iceberg/IcebergDatasetTest.java | 2 +-
.../IcebergOverwritePartitionsStepTest.java | 25 ++---
6 files changed, 145 insertions(+), 109 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
index 13fb5d07b3..30dc2094f3 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
@@ -28,7 +27,6 @@ import java.util.concurrent.ExecutionException;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.util.SerializationUtil;
import com.github.rholder.retry.Attempt;
import com.github.rholder.retry.RetryException;
@@ -38,6 +36,7 @@ import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.commit.CommitStep;
@@ -59,8 +58,8 @@ import static
org.apache.gobblin.util.retry.RetryerFactory.RetryType;
public class IcebergOverwritePartitionsStep implements CommitStep {
private final String destTableIdStr;
private final Properties properties;
- // Data files are kept as a list of base64 encoded strings for optimised
de-serialization.
- private final List<String> base64EncodedDataFiles;
+ // data files are populated once all the copy tasks are done. Each
IcebergPartitionCopyableFile has a serialized data file
+ @Setter private List<DataFile> dataFiles;
private final String partitionColName;
private final String partitionValue;
public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
@@ -74,14 +73,12 @@ public class IcebergOverwritePartitionsStep implements
CommitStep {
* Constructs an {@code IcebergReplacePartitionsStep} with the specified
parameters.
*
* @param destTableIdStr the identifier of the destination table as a string
- * @param base64EncodedDataFiles [from List<DataFiles>] the serialized data
files to be used for replacing partitions
* @param properties the properties containing configuration
*/
- public IcebergOverwritePartitionsStep(String destTableIdStr, String
partitionColName, String partitionValue, List<String> base64EncodedDataFiles,
Properties properties) {
+ public IcebergOverwritePartitionsStep(String destTableIdStr, String
partitionColName, String partitionValue, Properties properties) {
this.destTableIdStr = destTableIdStr;
this.partitionColName = partitionColName;
this.partitionValue = partitionValue;
- this.base64EncodedDataFiles = base64EncodedDataFiles;
this.properties = properties;
}
@@ -103,7 +100,6 @@ public class IcebergOverwritePartitionsStep implements
CommitStep {
// our copying. any new data written in the meanwhile to THE SAME
partitions we are about to overwrite will be
// clobbered and replaced by the copy entities from our execution.
IcebergTable destTable =
createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr));
- List<DataFile> dataFiles = getDataFiles();
try {
log.info("~{}~ Starting partition overwrite - partition: {}; value: {};
numDataFiles: {}; path[0]: {}",
this.destTableIdStr,
@@ -140,14 +136,6 @@ public class IcebergOverwritePartitionsStep implements
CommitStep {
}
}
- private List<DataFile> getDataFiles() {
- List<DataFile> dataFiles = new ArrayList<>(base64EncodedDataFiles.size());
- for (String base64EncodedDataFile : base64EncodedDataFiles) {
-
dataFiles.add(SerializationUtil.deserializeFromBase64(base64EncodedDataFile));
- }
- return dataFiles;
- }
-
protected IcebergCatalog createDestinationCatalog() throws IOException {
return IcebergDatasetFinder.createIcebergCatalog(this.properties,
IcebergDatasetFinder.CatalogLocation.DESTINATION);
}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionCopyableFile.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionCopyableFile.java
new file mode 100644
index 0000000000..5a0bc6d8f3
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionCopyableFile.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.util.SerializationUtil;
+
+import lombok.AccessLevel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyableFile;
+
+
+/**
+ * An extension of {@link CopyableFile} that includes a base64-encoded Iceberg
{@link DataFile}.
+ */
+@Getter
+@Setter
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@EqualsAndHashCode(callSuper = true)
+@Slf4j
+public class IcebergPartitionCopyableFile extends CopyableFile {
+
+ /**
+ * Base64-encoded Iceberg {@link DataFile} associated with this copyable
file.
+ */
+ private String base64EncodedDataFile;
+
+ public IcebergPartitionCopyableFile(CopyableFile copyableFile, DataFile
dataFile) {
+ super(copyableFile.getOrigin(), copyableFile.getDestination(),
copyableFile.getDestinationOwnerAndPermission(),
+ copyableFile.getAncestorsOwnerAndPermission(),
copyableFile.getChecksum(), copyableFile.getPreserve(),
+ copyableFile.getFileSet(), copyableFile.getOriginTimestamp(),
copyableFile.getUpstreamTimestamp(),
+ copyableFile.getAdditionalMetadata(), copyableFile.datasetOutputPath,
copyableFile.getDataFileVersionStrategy());
+ this.base64EncodedDataFile = SerializationUtil.serializeToBase64(dataFile);
+ }
+
+ public DataFile getDataFile() {
+ return SerializationUtil.deserializeFromBase64(base64EncodedDataFile);
+ }
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
index 658f4a265d..5600b47f08 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
@@ -20,15 +20,13 @@ package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.function.Predicate;
-import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -42,9 +40,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.util.SerializationUtil;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.ImmutableList;
import com.google.common.base.Preconditions;
@@ -101,48 +97,16 @@ public class IcebergPartitionDataset extends
IcebergDataset {
// TODO: Refactor the IcebergDataset::generateCopyEntities to avoid code
duplication
// Differences are getting data files, copying ancestor permission and
adding post publish steps
String fileSet = this.getFileSetId();
- List<CopyEntity> copyEntities = Lists.newArrayList();
IcebergTable srcIcebergTable = getSrcIcebergTable();
List<DataFile> srcDataFiles =
srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
- Map<Path, DataFile> destDataFileBySrcPath =
calcDestDataFileBySrcPath(srcDataFiles);
- Configuration defaultHadoopConfiguration = new Configuration();
-
- for (Map.Entry<Path, FileStatus> entry :
calcSrcFileStatusByDestFilePath(destDataFileBySrcPath).entrySet()) {
- Path destPath = entry.getKey();
- FileStatus srcFileStatus = entry.getValue();
- // TODO: should be the same FS each time; try creating once, reusing
thereafter, to not recreate wastefully
- FileSystem actualSourceFs =
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
-
- CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
- actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath),
copyConfig)
- .fileSet(fileSet)
- .datasetOutputPath(targetFs.getUri().getPath())
- .build();
-
- fileEntity.setSourceData(getSourceDataset(this.sourceFs));
- fileEntity.setDestinationData(getDestinationDataset(targetFs));
- copyEntities.add(fileEntity);
- }
-
- // Adding this check to avoid adding post publish step when there are no
files to copy.
- List<DataFile> destDataFiles = new
ArrayList<>(destDataFileBySrcPath.values());
- if (CollectionUtils.isNotEmpty(destDataFiles)) {
- copyEntities.add(createOverwritePostPublishStep(destDataFiles));
- }
-
- log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
- return copyEntities;
- }
- private Map<Path, DataFile> calcDestDataFileBySrcPath(List<DataFile>
srcDataFiles)
- throws IcebergTable.TableNotFoundException {
- String fileSet = this.getFileSetId();
- Map<Path, DataFile> destDataFileBySrcPath = new
ConcurrentHashMap<>(srcDataFiles.size());
if (srcDataFiles.isEmpty()) {
log.warn("~{}~ found no data files for partition col : {} with partition
value : {} to copy", fileSet,
this.partitionColumnName, this.partitionColValue);
- return destDataFileBySrcPath;
+ return new ArrayList<>(0);
}
+
+ // get source & destination write data locations to update data file paths
TableMetadata srcTableMetadata =
getSrcIcebergTable().accessTableMetadata();
TableMetadata destTableMetadata =
getDestIcebergTable().accessTableMetadata();
PartitionSpec partitionSpec = destTableMetadata.spec();
@@ -160,17 +124,58 @@ public class IcebergPartitionDataset extends
IcebergDataset {
destWriteDataLocation
);
}
- srcDataFiles.forEach(dataFile -> {
+
+ List<CopyEntity> copyEntities = getIcebergParitionCopyEntities(targetFs,
srcDataFiles, srcWriteDataLocation, destWriteDataLocation, partitionSpec,
copyConfig);
+ // Adding this check to avoid adding post publish step when there are no
files to copy.
+ if (CollectionUtils.isNotEmpty(copyEntities)) {
+ copyEntities.add(createOverwritePostPublishStep());
+ }
+
+ log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
+ return copyEntities;
+ }
+
+ private List<CopyEntity> getIcebergParitionCopyEntities(
+ FileSystem targetFs,
+ List<DataFile> srcDataFiles,
+ String srcWriteDataLocation,
+ String destWriteDataLocation,
+ PartitionSpec partitionSpec,
+ CopyConfiguration copyConfig) {
+ String fileSet = this.getFileSetId();
+ Configuration defaultHadoopConfiguration = new Configuration();
+ List<CopyEntity> copyEntities = Collections.synchronizedList(new
ArrayList<>(srcDataFiles.size()));
+ Function<Path, FileStatus> getFileStatus =
CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus);
+
+ srcDataFiles.parallelStream().forEach(dataFile -> {
+ // create destination data file from source data file by replacing the
source path with destination path
String srcFilePath = dataFile.path().toString();
Path updatedDestFilePath = relocateDestPath(srcFilePath,
srcWriteDataLocation, destWriteDataLocation);
log.debug("~{}~ Path changed from Src : {} to Dest : {}", fileSet,
srcFilePath, updatedDestFilePath);
- destDataFileBySrcPath.put(new Path(srcFilePath),
DataFiles.builder(partitionSpec)
+ DataFile destDataFile = DataFiles.builder(partitionSpec)
.copy(dataFile)
.withPath(updatedDestFilePath.toString())
- .build());
+ .build();
+
+ // get file status of source file
+ FileStatus srcFileStatus = getFileStatus.apply(new Path(srcFilePath));
+ try {
+ // TODO: should be the same FS each time; try creating once, reusing
thereafter, to not recreate wastefully
+ FileSystem actualSourceFs =
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
+ // create copyable file entity
+ CopyableFile fileEntity =
CopyableFile.fromOriginAndDestination(actualSourceFs, srcFileStatus,
+ targetFs.makeQualified(updatedDestFilePath),
copyConfig).fileSet(fileSet)
+ .datasetOutputPath(targetFs.getUri().getPath()).build();
+ fileEntity.setSourceData(getSourceDataset(this.sourceFs));
+ fileEntity.setDestinationData(getDestinationDataset(targetFs));
+ // add corresponding data file to each copyable iceberg partition file
+ IcebergPartitionCopyableFile icebergPartitionCopyableFile = new
IcebergPartitionCopyableFile(fileEntity, destDataFile);
+ copyEntities.add(icebergPartitionCopyableFile);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
});
- log.info("~{}~ created {} destination data files", fileSet,
destDataFileBySrcPath.size());
- return destDataFileBySrcPath;
+ return copyEntities;
}
private Path relocateDestPath(String curPathStr, String prefixToBeReplaced,
String prefixToReplaceWith) {
@@ -186,43 +191,17 @@ public class IcebergPartitionDataset extends
IcebergDataset {
return new Path(fileDir, newFileName);
}
- private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path,
DataFile> destDataFileBySrcPath)
- throws IOException {
- Function<Path, FileStatus> getFileStatus =
CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus);
- Map<Path, FileStatus> srcFileStatusByDestFilePath = new
ConcurrentHashMap<>();
- try {
- srcFileStatusByDestFilePath = destDataFileBySrcPath.entrySet()
- .parallelStream()
- .collect(Collectors.toConcurrentMap(entry -> new
Path(entry.getValue().path().toString()),
- entry -> getFileStatus.apply(entry.getKey())));
- } catch (CheckedExceptionFunction.WrappedIOException wrapper) {
- wrapper.rethrowWrapped();
- }
- return srcFileStatusByDestFilePath;
- }
-
- private PostPublishStep createOverwritePostPublishStep(List<DataFile>
destDataFiles) {
- List<String> serializedDataFiles =
getBase64EncodedDataFiles(destDataFiles);
-
+ private PostPublishStep createOverwritePostPublishStep() {
IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new
IcebergOverwritePartitionsStep(
this.getDestIcebergTable().getTableId().toString(),
this.partitionColumnName,
this.partitionColValue,
- serializedDataFiles,
this.properties
);
return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(),
icebergOverwritePartitionStep, 0);
}
- private List<String> getBase64EncodedDataFiles(List<DataFile> destDataFiles)
{
- List<String> base64EncodedDataFiles = new
ArrayList<>(destDataFiles.size());
- for (DataFile dataFile : destDataFiles) {
-
base64EncodedDataFiles.add(SerializationUtil.serializeToBase64(dataFile));
- }
- return base64EncodedDataFiles;
- }
-
private Predicate<StructLike> createPartitionFilterPredicate() throws
IOException {
//TODO: Refactor it later using factory or other way to support different
types of filter predicate
// Also take into consideration creation of Expression Filter to be used
in overwrite api
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index ef8a0e72e8..0cc77a9aec 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.copy.publisher;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -28,6 +29,7 @@ import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@@ -58,6 +60,8 @@ import
org.apache.gobblin.data.management.copy.PreserveAttributes;
import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
+import
org.apache.gobblin.data.management.copy.iceberg.IcebergPartitionCopyableFile;
+import
org.apache.gobblin.data.management.copy.iceberg.IcebergOverwritePartitionsStep;
import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
import
org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter;
@@ -333,7 +337,7 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
log.info("[{}] Found {} pre-publish steps and {} post-publish steps.",
datasetAndPartition.identifier(),
prePublishSteps.size(), postPublishSteps.size());
- executeCommitSequence(prePublishSteps);
+ executeCommitSequence(prePublishSteps, new ArrayList<>(0));
if (statesHelper.hasAnyCopyableFile()) {
// Targets are always absolute, so we start moving from root (will skip
any existing directories).
@@ -351,12 +355,19 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
// ensure every successful state is committed
// WARNING: this MUST NOT run before the WU is actually executed--hence
NOT YET for post-publish steps!
// (that's because `WorkUnitState::getWorkingState()` returns
`WorkingState.SUCCESSFUL` merely when the overall job succeeded--even for WUs
yet to execute)
+
+ // maintain a list of iceberg partition data files to be used in
IcebergOverwritePartitionsStep if applicable.
+ // This is required for the final commit step where all the data files are
overwritten to partition.
+ List<DataFile> icebergDataFiles = new ArrayList<>();
for (WorkUnitState wus : statesHelper.getNonPostPublishStates()) {
if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
}
CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
if (copyEntity instanceof CopyableFile) {
+ if (copyEntity instanceof IcebergPartitionCopyableFile) {
+ icebergDataFiles.add(((IcebergPartitionCopyableFile)
copyEntity).getDataFile());
+ }
CopyableFile copyableFile = (CopyableFile) copyEntity;
if (wus.getWorkingState() == WorkingState.COMMITTED) {
// Committed files should exist in destination otherwise FNFE will
be thrown
@@ -384,7 +395,7 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
}
// execute `postPublishSteps` after preserving file attributes, as some,
like `SetPermissionCommitStep`, will themselves set permissions
- executeCommitSequence(postPublishSteps);
+ executeCommitSequence(postPublishSteps, icebergDataFiles);
// since `postPublishSteps` have now executed, finally ready to ensure
every successful WU state of those gets committed
for (WorkUnitState wus : statesHelper.getPostPublishStates()) {
@@ -410,8 +421,16 @@ public class CopyDataPublisher extends DataPublisher
implements UnpublishedHandl
Long.toString(datasetOriginTimestamp),
Long.toString(datasetUpstreamTimestamp), additionalMetadata);
}
- private static void executeCommitSequence(List<CommitStep> steps) throws
IOException {
+ private static void executeCommitSequence(List<CommitStep> steps,
List<DataFile> icebergDataFiles) throws IOException {
for (CommitStep step : steps) {
+ // if commit step is IcebergOverwritePartitionsStep, set the data files
to be used for overwriting partitions
+ // Every copy entity in the work unit state has one data file associated
with it. All data files are collected &
+ // passed to the commit step here.
+ if (step instanceof IcebergOverwritePartitionsStep) {
+ IcebergOverwritePartitionsStep overwriteStep =
(IcebergOverwritePartitionsStep) step;
+ log.info("Adding {} iceberg data files to
IcebergOverwritePartitionsStep", icebergDataFiles.size());
+ overwriteStep.setDataFiles(icebergDataFiles);
+ }
step.execute();
}
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index a446fbb1a7..80da530733 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -447,7 +447,7 @@ public class IcebergDatasetTest {
String objectType = new Gson().fromJson(json, JsonObject.class)
.getAsJsonPrimitive("object-type")
.getAsString();
- return
objectType.equals("org.apache.gobblin.data.management.copy.CopyableFile");
+ return
objectType.equals("org.apache.gobblin.data.management.copy.CopyableFile") ||
objectType.equals("org.apache.gobblin.data.management.copy.iceberg.IcebergPartitionCopyableFile");
}
private static void
verifyFsOwnershipAndPermissionPreservation(Collection<CopyEntity> copyEntities,
Map<Path, FileStatus> expectedPathsAndFileStatuses) {
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
index e6a626fca7..e4c1774e9a 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
@@ -18,7 +18,6 @@
package org.apache.gobblin.data.management.copy.iceberg;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -26,7 +25,6 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.util.SerializationUtil;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
@@ -44,7 +42,6 @@ public class IcebergOverwritePartitionsStepTest {
private IcebergTable mockIcebergTable;
private IcebergCatalog mockIcebergCatalog;
private Properties mockProperties;
- private List<String> base64EncodedDataFiles;
private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;
@BeforeMethod
@@ -53,24 +50,15 @@ public class IcebergOverwritePartitionsStepTest {
mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
mockProperties = new Properties();
- base64EncodedDataFiles = getEncodedDummyDataFiles();
-
spyIcebergOverwritePartitionsStep = Mockito.spy(new
IcebergOverwritePartitionsStep(destTableIdStr,
- testPartitionColName, testPartitionColValue, base64EncodedDataFiles,
mockProperties));
+ testPartitionColName, testPartitionColValue, mockProperties));
+
+ spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
}
- private List<String> getEncodedDummyDataFiles() {
- List<DataFile> dummyDataFiles = createDummyDataFiles();
- List<String> base64EncodedDataFiles = new
ArrayList<>(dummyDataFiles.size());
- for (DataFile dataFile : dummyDataFiles) {
-
base64EncodedDataFiles.add(SerializationUtil.serializeToBase64(dataFile));
- }
- return base64EncodedDataFiles;
- }
-
@Test
public void testNeverIsCompleted() {
Assert.assertFalse(spyIcebergOverwritePartitionsStep.isCompleted());
@@ -125,7 +113,10 @@ public class IcebergOverwritePartitionsStepTest {
mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX
+ "." + RETRY_TIMES,
Integer.toString(retryCount));
spyIcebergOverwritePartitionsStep = Mockito.spy(new
IcebergOverwritePartitionsStep(destTableIdStr,
- testPartitionColName, testPartitionColValue, base64EncodedDataFiles,
mockProperties));
+ testPartitionColName, testPartitionColValue, mockProperties));
+
+ spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
+
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
try {
@@ -142,7 +133,7 @@ public class IcebergOverwritePartitionsStepTest {
}
}
- private List<DataFile> createDummyDataFiles() {
+ private List<DataFile> getDummyDataFiles() {
DataFile dataFile1 = DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/db/foo/data/datafile1.orc")
.withFileSizeInBytes(1234)