This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ad93db8dc Construct iceberg data files during commit step (#4140)
5ad93db8dc is described below

commit 5ad93db8dcefb859934ef3a31a88738a14a95b1b
Author: thisisArjit <[email protected]>
AuthorDate: Mon Sep 8 15:57:07 2025 +0530

    Construct iceberg data files during commit step (#4140)
---
 .../iceberg/IcebergOverwritePartitionsStep.java    |  20 +---
 .../copy/iceberg/IcebergPartitionCopyableFile.java |  59 ++++++++++
 .../copy/iceberg/IcebergPartitionDataset.java      | 123 +++++++++------------
 .../copy/publisher/CopyDataPublisher.java          |  25 ++++-
 .../copy/iceberg/IcebergDatasetTest.java           |   2 +-
 .../IcebergOverwritePartitionsStepTest.java        |  25 ++---
 6 files changed, 145 insertions(+), 109 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
index 13fb5d07b3..30dc2094f3 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.io.IOException;
 import java.time.Duration;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
@@ -28,7 +27,6 @@ import java.util.concurrent.ExecutionException;
 
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.util.SerializationUtil;
 
 import com.github.rholder.retry.Attempt;
 import com.github.rholder.retry.RetryException;
@@ -38,6 +36,7 @@ import com.google.common.collect.ImmutableMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.commit.CommitStep;
@@ -59,8 +58,8 @@ import static 
org.apache.gobblin.util.retry.RetryerFactory.RetryType;
 public class IcebergOverwritePartitionsStep implements CommitStep {
   private final String destTableIdStr;
   private final Properties properties;
-  // Data files are kept as a list of base64 encoded strings for optimised 
de-serialization.
-  private final List<String> base64EncodedDataFiles;
+  // data files are populated once all the copy tasks are done. Each 
IcebergPartitionCopyableFile has a serialized data file
+  @Setter private List<DataFile> dataFiles;
   private final String partitionColName;
   private final String partitionValue;
   public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
@@ -74,14 +73,12 @@ public class IcebergOverwritePartitionsStep implements 
CommitStep {
    * Constructs an {@code IcebergReplacePartitionsStep} with the specified 
parameters.
    *
    * @param destTableIdStr the identifier of the destination table as a string
-   * @param base64EncodedDataFiles [from List<DataFiles>] the serialized data 
files to be used for replacing partitions
    * @param properties the properties containing configuration
    */
-  public IcebergOverwritePartitionsStep(String destTableIdStr, String 
partitionColName, String partitionValue, List<String> base64EncodedDataFiles, 
Properties properties) {
+  public IcebergOverwritePartitionsStep(String destTableIdStr, String 
partitionColName, String partitionValue, Properties properties) {
     this.destTableIdStr = destTableIdStr;
     this.partitionColName = partitionColName;
     this.partitionValue = partitionValue;
-    this.base64EncodedDataFiles = base64EncodedDataFiles;
     this.properties = properties;
   }
 
@@ -103,7 +100,6 @@ public class IcebergOverwritePartitionsStep implements 
CommitStep {
     // our copying. any new data written in the meanwhile to THE SAME 
partitions we are about to overwrite will be
     // clobbered and replaced by the copy entities from our execution.
     IcebergTable destTable = 
createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr));
-    List<DataFile> dataFiles = getDataFiles();
     try {
       log.info("~{}~ Starting partition overwrite - partition: {}; value: {}; 
numDataFiles: {}; path[0]: {}",
           this.destTableIdStr,
@@ -140,14 +136,6 @@ public class IcebergOverwritePartitionsStep implements 
CommitStep {
     }
   }
 
-  private List<DataFile> getDataFiles() {
-    List<DataFile> dataFiles = new ArrayList<>(base64EncodedDataFiles.size());
-    for (String base64EncodedDataFile : base64EncodedDataFiles) {
-      
dataFiles.add(SerializationUtil.deserializeFromBase64(base64EncodedDataFile));
-    }
-    return dataFiles;
-  }
-
   protected IcebergCatalog createDestinationCatalog() throws IOException {
     return IcebergDatasetFinder.createIcebergCatalog(this.properties, 
IcebergDatasetFinder.CatalogLocation.DESTINATION);
   }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionCopyableFile.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionCopyableFile.java
new file mode 100644
index 0000000000..5a0bc6d8f3
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionCopyableFile.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.util.SerializationUtil;
+
+import lombok.AccessLevel;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyableFile;
+
+
+/**
+ * An extension of {@link CopyableFile} that includes a base64-encoded Iceberg 
{@link DataFile}.
+ */
+@Getter
+@Setter
+@NoArgsConstructor(access = AccessLevel.PROTECTED)
+@EqualsAndHashCode(callSuper = true)
+@Slf4j
+public class IcebergPartitionCopyableFile extends CopyableFile {
+
+  /**
+   * Base64-encoded Iceberg {@link DataFile} associated with this copyable 
file.
+   */
+  private String base64EncodedDataFile;
+
+  public IcebergPartitionCopyableFile(CopyableFile copyableFile, DataFile 
dataFile) {
+    super(copyableFile.getOrigin(), copyableFile.getDestination(), 
copyableFile.getDestinationOwnerAndPermission(),
+        copyableFile.getAncestorsOwnerAndPermission(), 
copyableFile.getChecksum(), copyableFile.getPreserve(),
+        copyableFile.getFileSet(), copyableFile.getOriginTimestamp(), 
copyableFile.getUpstreamTimestamp(),
+        copyableFile.getAdditionalMetadata(), copyableFile.datasetOutputPath, 
copyableFile.getDataFileVersionStrategy());
+    this.base64EncodedDataFile = SerializationUtil.serializeToBase64(dataFile);
+  }
+
+  public DataFile getDataFile() {
+    return SerializationUtil.deserializeFromBase64(base64EncodedDataFile);
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
index 658f4a265d..5600b47f08 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
@@ -20,15 +20,13 @@ package org.apache.gobblin.data.management.copy.iceberg;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -42,9 +40,7 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.util.SerializationUtil;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.ImmutableList;
 import com.google.common.base.Preconditions;
@@ -101,48 +97,16 @@ public class IcebergPartitionDataset extends 
IcebergDataset {
     // TODO: Refactor the IcebergDataset::generateCopyEntities to avoid code 
duplication
     //  Differences are getting data files, copying ancestor permission and 
adding post publish steps
     String fileSet = this.getFileSetId();
-    List<CopyEntity> copyEntities = Lists.newArrayList();
     IcebergTable srcIcebergTable = getSrcIcebergTable();
     List<DataFile> srcDataFiles = 
srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
-    Map<Path, DataFile> destDataFileBySrcPath = 
calcDestDataFileBySrcPath(srcDataFiles);
-    Configuration defaultHadoopConfiguration = new Configuration();
-
-    for (Map.Entry<Path, FileStatus> entry : 
calcSrcFileStatusByDestFilePath(destDataFileBySrcPath).entrySet()) {
-      Path destPath = entry.getKey();
-      FileStatus srcFileStatus = entry.getValue();
-      // TODO: should be the same FS each time; try creating once, reusing 
thereafter, to not recreate wastefully
-      FileSystem actualSourceFs = 
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
-
-      CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
-              actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), 
copyConfig)
-          .fileSet(fileSet)
-          .datasetOutputPath(targetFs.getUri().getPath())
-          .build();
-
-      fileEntity.setSourceData(getSourceDataset(this.sourceFs));
-      fileEntity.setDestinationData(getDestinationDataset(targetFs));
-      copyEntities.add(fileEntity);
-    }
-
-    // Adding this check to avoid adding post publish step when there are no 
files to copy.
-    List<DataFile> destDataFiles = new 
ArrayList<>(destDataFileBySrcPath.values());
-    if (CollectionUtils.isNotEmpty(destDataFiles)) {
-      copyEntities.add(createOverwritePostPublishStep(destDataFiles));
-    }
-
-    log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
-    return copyEntities;
-  }
 
-  private Map<Path, DataFile> calcDestDataFileBySrcPath(List<DataFile> 
srcDataFiles)
-      throws IcebergTable.TableNotFoundException {
-    String fileSet = this.getFileSetId();
-    Map<Path, DataFile> destDataFileBySrcPath = new 
ConcurrentHashMap<>(srcDataFiles.size());
     if (srcDataFiles.isEmpty()) {
       log.warn("~{}~ found no data files for partition col : {} with partition 
value : {} to copy", fileSet,
           this.partitionColumnName, this.partitionColValue);
-      return destDataFileBySrcPath;
+      return new ArrayList<>(0);
     }
+
+    // get source & destination write data locations to update data file paths
     TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
     TableMetadata destTableMetadata = 
getDestIcebergTable().accessTableMetadata();
     PartitionSpec partitionSpec = destTableMetadata.spec();
@@ -160,17 +124,58 @@ public class IcebergPartitionDataset extends 
IcebergDataset {
           destWriteDataLocation
       );
     }
-    srcDataFiles.forEach(dataFile -> {
+
+    List<CopyEntity> copyEntities = getIcebergParitionCopyEntities(targetFs, 
srcDataFiles, srcWriteDataLocation, destWriteDataLocation, partitionSpec, 
copyConfig);
+    // Adding this check to avoid adding post publish step when there are no 
files to copy.
+    if (CollectionUtils.isNotEmpty(copyEntities)) {
+      copyEntities.add(createOverwritePostPublishStep());
+    }
+
+    log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
+    return copyEntities;
+  }
+
+  private List<CopyEntity> getIcebergParitionCopyEntities(
+      FileSystem targetFs,
+      List<DataFile> srcDataFiles,
+      String srcWriteDataLocation,
+      String destWriteDataLocation,
+      PartitionSpec partitionSpec,
+      CopyConfiguration copyConfig) {
+    String fileSet = this.getFileSetId();
+    Configuration defaultHadoopConfiguration = new Configuration();
+    List<CopyEntity> copyEntities = Collections.synchronizedList(new 
ArrayList<>(srcDataFiles.size()));
+    Function<Path, FileStatus> getFileStatus = 
CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus);
+
+    srcDataFiles.parallelStream().forEach(dataFile -> {
+      // create destination data file from source data file by replacing the 
source path with destination path
       String srcFilePath = dataFile.path().toString();
       Path updatedDestFilePath = relocateDestPath(srcFilePath, 
srcWriteDataLocation, destWriteDataLocation);
       log.debug("~{}~ Path changed from Src : {} to Dest : {}", fileSet, 
srcFilePath, updatedDestFilePath);
-      destDataFileBySrcPath.put(new Path(srcFilePath), 
DataFiles.builder(partitionSpec)
+      DataFile destDataFile = DataFiles.builder(partitionSpec)
           .copy(dataFile)
           .withPath(updatedDestFilePath.toString())
-          .build());
+          .build();
+
+      // get file status of source file
+      FileStatus srcFileStatus = getFileStatus.apply(new Path(srcFilePath));
+      try {
+        // TODO: should be the same FS each time; try creating once, reusing 
thereafter, to not recreate wastefully
+        FileSystem actualSourceFs = 
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
+        // create copyable file entity
+        CopyableFile fileEntity = 
CopyableFile.fromOriginAndDestination(actualSourceFs, srcFileStatus,
+                targetFs.makeQualified(updatedDestFilePath), 
copyConfig).fileSet(fileSet)
+            .datasetOutputPath(targetFs.getUri().getPath()).build();
+        fileEntity.setSourceData(getSourceDataset(this.sourceFs));
+        fileEntity.setDestinationData(getDestinationDataset(targetFs));
+        // add corresponding data file to each copyable iceberg partition file
+        IcebergPartitionCopyableFile icebergPartitionCopyableFile = new 
IcebergPartitionCopyableFile(fileEntity, destDataFile);
+        copyEntities.add(icebergPartitionCopyableFile);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
     });
-    log.info("~{}~ created {} destination data files", fileSet, 
destDataFileBySrcPath.size());
-    return destDataFileBySrcPath;
+    return copyEntities;
   }
 
   private Path relocateDestPath(String curPathStr, String prefixToBeReplaced, 
String prefixToReplaceWith) {
@@ -186,43 +191,17 @@ public class IcebergPartitionDataset extends 
IcebergDataset {
     return new Path(fileDir, newFileName);
   }
 
-  private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path, 
DataFile> destDataFileBySrcPath)
-      throws IOException {
-    Function<Path, FileStatus> getFileStatus = 
CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus);
-    Map<Path, FileStatus> srcFileStatusByDestFilePath = new 
ConcurrentHashMap<>();
-    try {
-      srcFileStatusByDestFilePath = destDataFileBySrcPath.entrySet()
-          .parallelStream()
-          .collect(Collectors.toConcurrentMap(entry -> new 
Path(entry.getValue().path().toString()),
-              entry -> getFileStatus.apply(entry.getKey())));
-    } catch (CheckedExceptionFunction.WrappedIOException wrapper) {
-      wrapper.rethrowWrapped();
-    }
-    return srcFileStatusByDestFilePath;
-  }
-
-  private PostPublishStep createOverwritePostPublishStep(List<DataFile> 
destDataFiles) {
-    List<String> serializedDataFiles = 
getBase64EncodedDataFiles(destDataFiles);
-
+  private PostPublishStep createOverwritePostPublishStep() {
     IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new 
IcebergOverwritePartitionsStep(
         this.getDestIcebergTable().getTableId().toString(),
         this.partitionColumnName,
         this.partitionColValue,
-        serializedDataFiles,
         this.properties
     );
 
     return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(), 
icebergOverwritePartitionStep, 0);
   }
 
-  private List<String> getBase64EncodedDataFiles(List<DataFile> destDataFiles) 
{
-    List<String> base64EncodedDataFiles = new 
ArrayList<>(destDataFiles.size());
-    for (DataFile dataFile : destDataFiles) {
-      
base64EncodedDataFiles.add(SerializationUtil.serializeToBase64(dataFile));
-    }
-    return base64EncodedDataFiles;
-  }
-
   private Predicate<StructLike> createPartitionFilterPredicate() throws 
IOException {
     //TODO: Refactor it later using factory or other way to support different 
types of filter predicate
     // Also take into consideration creation of Expression Filter to be used 
in overwrite api
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
index ef8a0e72e8..0cc77a9aec 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/publisher/CopyDataPublisher.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.copy.publisher;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
@@ -28,6 +29,7 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -58,6 +60,8 @@ import 
org.apache.gobblin.data.management.copy.PreserveAttributes;
 import org.apache.gobblin.data.management.copy.entities.CommitStepCopyEntity;
 import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
 import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
+import 
org.apache.gobblin.data.management.copy.iceberg.IcebergPartitionCopyableFile;
+import 
org.apache.gobblin.data.management.copy.iceberg.IcebergOverwritePartitionsStep;
 import org.apache.gobblin.data.management.copy.recovery.RecoveryHelper;
 import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
 import 
org.apache.gobblin.data.management.copy.writer.FileAwareInputStreamDataWriter;
@@ -333,7 +337,7 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
     log.info("[{}] Found {} pre-publish steps and {} post-publish steps.", 
datasetAndPartition.identifier(),
         prePublishSteps.size(), postPublishSteps.size());
 
-    executeCommitSequence(prePublishSteps);
+    executeCommitSequence(prePublishSteps, new ArrayList<>(0));
 
     if (statesHelper.hasAnyCopyableFile()) {
       // Targets are always absolute, so we start moving from root (will skip 
any existing directories).
@@ -351,12 +355,19 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
     // ensure every successful state is committed
     // WARNING: this MUST NOT run before the WU is actually executed--hence 
NOT YET for post-publish steps!
     // (that's because `WorkUnitState::getWorkingState()` returns 
`WorkingState.SUCCESSFUL` merely when the overall job succeeded--even for WUs 
yet to execute)
+
+    // maintain a list of iceberg partition data files to be used in 
IcebergOverwritePartitionsStep if applicable.
+    // This is required for the final commit step where all the data files are 
overwritten to partition.
+    List<DataFile> icebergDataFiles = new ArrayList<>();
     for (WorkUnitState wus : statesHelper.getNonPostPublishStates()) {
       if (wus.getWorkingState() == WorkingState.SUCCESSFUL) {
         wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
       }
       CopyEntity copyEntity = CopySource.deserializeCopyEntity(wus);
       if (copyEntity instanceof CopyableFile) {
+        if (copyEntity instanceof IcebergPartitionCopyableFile) {
+          icebergDataFiles.add(((IcebergPartitionCopyableFile) 
copyEntity).getDataFile());
+        }
         CopyableFile copyableFile = (CopyableFile) copyEntity;
         if (wus.getWorkingState() == WorkingState.COMMITTED) {
           // Committed files should exist in destination otherwise FNFE will 
be thrown
@@ -384,7 +395,7 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
     }
 
     // execute `postPublishSteps` after preserving file attributes, as some, 
like `SetPermissionCommitStep`, will themselves set permissions
-    executeCommitSequence(postPublishSteps);
+    executeCommitSequence(postPublishSteps, icebergDataFiles);
 
     // since `postPublishSteps` have now executed, finally ready to ensure 
every successful WU state of those gets committed
     for (WorkUnitState wus : statesHelper.getPostPublishStates()) {
@@ -410,8 +421,16 @@ public class CopyDataPublisher extends DataPublisher 
implements UnpublishedHandl
         Long.toString(datasetOriginTimestamp), 
Long.toString(datasetUpstreamTimestamp), additionalMetadata);
   }
 
-  private static void executeCommitSequence(List<CommitStep> steps) throws 
IOException {
+  private static void executeCommitSequence(List<CommitStep> steps, 
List<DataFile> icebergDataFiles) throws IOException {
     for (CommitStep step : steps) {
+      // if commit step is IcebergOverwritePartitionsStep, set the data files 
to be used for overwriting partitions
+      // Every copy entity in the work unit state has one data file associated 
with it. All data files are collected &
+      // passed to the commit step here.
+      if (step instanceof IcebergOverwritePartitionsStep) {
+        IcebergOverwritePartitionsStep overwriteStep = 
(IcebergOverwritePartitionsStep) step;
+        log.info("Adding {} iceberg data files to 
IcebergOverwritePartitionsStep", icebergDataFiles.size());
+        overwriteStep.setDataFiles(icebergDataFiles);
+      }
       step.execute();
     }
   }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index a446fbb1a7..80da530733 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -447,7 +447,7 @@ public class IcebergDatasetTest {
     String objectType = new Gson().fromJson(json, JsonObject.class)
         .getAsJsonPrimitive("object-type")
         .getAsString();
-    return 
objectType.equals("org.apache.gobblin.data.management.copy.CopyableFile");
+    return 
objectType.equals("org.apache.gobblin.data.management.copy.CopyableFile") || 
objectType.equals("org.apache.gobblin.data.management.copy.iceberg.IcebergPartitionCopyableFile");
   }
 
   private static void 
verifyFsOwnershipAndPermissionPreservation(Collection<CopyEntity> copyEntities, 
Map<Path, FileStatus> expectedPathsAndFileStatuses) {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
index e6a626fca7..e4c1774e9a 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
@@ -18,7 +18,6 @@
 package org.apache.gobblin.data.management.copy.iceberg;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 
@@ -26,7 +25,6 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.util.SerializationUtil;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
@@ -44,7 +42,6 @@ public class IcebergOverwritePartitionsStepTest {
   private IcebergTable mockIcebergTable;
   private IcebergCatalog mockIcebergCatalog;
   private Properties mockProperties;
-  private List<String> base64EncodedDataFiles;
   private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;
 
   @BeforeMethod
@@ -53,24 +50,15 @@ public class IcebergOverwritePartitionsStepTest {
     mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
     mockProperties = new Properties();
 
-    base64EncodedDataFiles = getEncodedDummyDataFiles();
-
     spyIcebergOverwritePartitionsStep = Mockito.spy(new 
IcebergOverwritePartitionsStep(destTableIdStr,
-        testPartitionColName, testPartitionColValue, base64EncodedDataFiles, 
mockProperties));
+        testPartitionColName, testPartitionColValue, mockProperties));
+
+    spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
 
     
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
     
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
   }
 
-  private List<String> getEncodedDummyDataFiles() {
-    List<DataFile> dummyDataFiles = createDummyDataFiles();
-    List<String> base64EncodedDataFiles = new 
ArrayList<>(dummyDataFiles.size());
-    for (DataFile dataFile : dummyDataFiles) {
-      
base64EncodedDataFiles.add(SerializationUtil.serializeToBase64(dataFile));
-    }
-    return base64EncodedDataFiles;
-  }
-
   @Test
   public void testNeverIsCompleted() {
     Assert.assertFalse(spyIcebergOverwritePartitionsStep.isCompleted());
@@ -125,7 +113,10 @@ public class IcebergOverwritePartitionsStepTest {
     
mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX
 + "." + RETRY_TIMES,
         Integer.toString(retryCount));
     spyIcebergOverwritePartitionsStep = Mockito.spy(new 
IcebergOverwritePartitionsStep(destTableIdStr,
-        testPartitionColName, testPartitionColValue, base64EncodedDataFiles, 
mockProperties));
+        testPartitionColName, testPartitionColValue, mockProperties));
+
+    spyIcebergOverwritePartitionsStep.setDataFiles(getDummyDataFiles());
+
     
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
     
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
     try {
@@ -142,7 +133,7 @@ public class IcebergOverwritePartitionsStepTest {
     }
   }
 
-  private List<DataFile> createDummyDataFiles() {
+  private List<DataFile> getDummyDataFiles() {
     DataFile dataFile1 = DataFiles.builder(PartitionSpec.unpartitioned())
         .withPath("/path/to/db/foo/data/datafile1.orc")
         .withFileSizeInBytes(1234)

Reply via email to