This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b639f6b75 [GOBBLIN-2159] Support partition level copy in 
Iceberg-Distcp (#4058)
4b639f6b75 is described below

commit 4b639f6b75ff716f8f2903678c6bf89e0b7a1690
Author: Vivek Rai <[email protected]>
AuthorDate: Thu Oct 24 00:17:01 2024 +0530

    [GOBBLIN-2159] Support partition level copy in Iceberg-Distcp (#4058)
---
 .../copy/iceberg/BaseIcebergCatalog.java           |  14 +-
 .../management/copy/iceberg/IcebergCatalog.java    |   4 +-
 .../copy/iceberg/IcebergDatasetFinder.java         |   9 +-
 .../copy/iceberg/IcebergHiveCatalog.java           |   6 +
 .../iceberg/IcebergOverwritePartitionsStep.java    | 164 ++++++++++++
 .../copy/iceberg/IcebergPartitionDataset.java      | 233 ++++++++++++++++
 .../iceberg/IcebergPartitionDatasetFinder.java     |  64 +++++
 .../data/management/copy/iceberg/IcebergTable.java |  85 +++++-
 ...MatchesAnyPropNamePartitionFilterPredicate.java |  67 +++++
 .../IcebergPartitionFilterPredicateUtil.java       |  73 ++++++
 .../copy/iceberg/IcebergDatasetTest.java           |  45 ++--
 .../IcebergOverwritePartitionsStepTest.java        | 156 +++++++++++
 .../copy/iceberg/IcebergPartitionDatasetTest.java  | 292 +++++++++++++++++++++
 .../management/copy/iceberg/IcebergTableTest.java  | 131 ++++++++-
 ...hesAnyPropNamePartitionFilterPredicateTest.java |  60 +++++
 .../IcebergPartitionFilterPredicateUtilTest.java   | 106 ++++++++
 16 files changed, 1475 insertions(+), 34 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
index 9e2ae53b99..b16e1aaa72 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java
@@ -21,10 +21,12 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 
 /**
  * Base implementation of {@link IcebergCatalog} to access {@link 
IcebergTable} and the
@@ -41,9 +43,15 @@ public abstract class BaseIcebergCatalog implements 
IcebergCatalog {
   }
 
   @Override
-  public IcebergTable openTable(String dbName, String tableName) {
+  public IcebergTable openTable(String dbName, String tableName) throws 
IcebergTable.TableNotFoundException {
     TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
-    return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), 
getDatasetDescriptorPlatform(), createTableOperations(tableId), 
this.getCatalogUri());
+    try {
+      return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), 
getDatasetDescriptorPlatform(),
+          createTableOperations(tableId), this.getCatalogUri(), 
loadTableInstance(tableId));
+    } catch (NoSuchTableException ex) {
+      // defend against `org.apache.iceberg.catalog.Catalog::loadTable` 
throwing inside some `@Override` of `loadTableInstance`
+      throw new IcebergTable.TableNotFoundException(tableId);
+    }
   }
 
   protected Catalog createCompanionCatalog(Map<String, String> properties, 
Configuration configuration) {
@@ -67,4 +75,6 @@ public abstract class BaseIcebergCatalog implements 
IcebergCatalog {
   }
 
   protected abstract TableOperations createTableOperations(TableIdentifier 
tableId);
+
+  protected abstract Table loadTableInstance(TableIdentifier tableId);
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
index 68e9bb31c6..05ddaf9c52 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java
@@ -29,10 +29,10 @@ import org.apache.iceberg.catalog.TableIdentifier;
 public interface IcebergCatalog {
 
   /** @return table identified by `dbName` and `tableName` */
-  IcebergTable openTable(String dbName, String tableName);
+  IcebergTable openTable(String dbName, String tableName) throws 
IcebergTable.TableNotFoundException;
 
   /** @return table identified by `tableId` */
-  default IcebergTable openTable(TableIdentifier tableId) {
+  default IcebergTable openTable(TableIdentifier tableId) throws 
IcebergTable.TableNotFoundException {
     // CHALLENGE: clearly better to implement in the reverse direction - 
`openTable(String, String)` in terms of `openTable(TableIdentifier)` -
     // but challenging to do at this point, with multiple derived classes 
already "in the wild" that implement `openTable(String, String)`
     return openTable(tableId.namespace().toString(), tableId.name());
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index f6668f5d18..e6afe37877 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -85,7 +85,7 @@ public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDatase
   }
 
   protected final FileSystem sourceFs;
-  private final Properties properties;
+  protected final Properties properties;
 
   /**
    * Finds all {@link IcebergDataset}s in the file system using the Iceberg 
Catalog.
@@ -153,7 +153,7 @@ public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDatase
     IcebergTable destIcebergTable = 
destinationIcebergCatalog.openTable(destDbName, destTableName);
     // TODO: Rethink strategy to enforce dest iceberg table
     
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
 String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, 
destTableName));
-    return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, 
fs, getConfigShouldCopyMetadataPath(properties));
+    return createSpecificDataset(srcIcebergTable, destIcebergTable, 
properties, fs, getConfigShouldCopyMetadataPath(properties));
   }
 
   protected static IcebergCatalog createIcebergCatalog(Properties properties, 
CatalogLocation location) throws IOException {
@@ -165,6 +165,11 @@ public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDatase
     return IcebergCatalogFactory.create(icebergCatalogClassName, 
catalogProperties, configuration);
   }
 
+  protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, 
IcebergTable destIcebergTable, Properties properties, FileSystem fs, boolean 
shouldIncludeMetadataPath)
+      throws IOException {
+    return new IcebergDataset(srcIcebergTable, destIcebergTable, properties, 
fs, shouldIncludeMetadataPath);
+  }
+
   protected static boolean getConfigShouldCopyMetadataPath(Properties 
properties) {
     return 
Boolean.valueOf(properties.getProperty(ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH,
 DEFAULT_ICEBERG_DATASET_SHOULD_COPY_METADATA_PATH));
   }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
index af541a79a5..27ea723df5 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hive.HiveCatalog;
@@ -61,4 +62,9 @@ public class IcebergHiveCatalog extends BaseIcebergCatalog {
   public boolean tableAlreadyExists(IcebergTable icebergTable) {
     return hc.tableExists(icebergTable.getTableId());
   }
+
+  @Override
+  protected Table loadTableInstance(TableIdentifier tableId) {
+    return hc.loadTable(tableId);
+  }
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
new file mode 100644
index 0000000000..dffcbccb27
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStep.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.github.rholder.retry.Attempt;
+import com.github.rholder.retry.RetryException;
+import com.github.rholder.retry.RetryListener;
+import com.github.rholder.retry.Retryer;
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.commit.CommitStep;
+import org.apache.gobblin.util.retry.RetryerFactory;
+
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_INTERVAL_MS;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TYPE;
+import static org.apache.gobblin.util.retry.RetryerFactory.RetryType;
+
+/**
+ * Commit step for overwriting partitions in an Iceberg table.
+ * <p>
+ * This class implements the {@link CommitStep} interface and provides 
functionality to overwrite
+ * partitions in the destination Iceberg table using serialized data files.
+ * </p>
+ */
+@Slf4j
+public class IcebergOverwritePartitionsStep implements CommitStep {
+  private final String destTableIdStr;
+  private final Properties properties;
+  private final byte[] serializedDataFiles;
+  private final String partitionColName;
+  private final String partitionValue;
+  public static final String OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX = 
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX +
+      ".catalog.overwrite.partitions.retries";
+  private static final Config RETRYER_FALLBACK_CONFIG = 
ConfigFactory.parseMap(ImmutableMap.of(
+      RETRY_INTERVAL_MS, TimeUnit.SECONDS.toMillis(3L),
+      RETRY_TIMES, 3,
+      RETRY_TYPE, RetryType.FIXED_ATTEMPT.name()));
+
+  /**
+   * Constructs an {@code IcebergReplacePartitionsStep} with the specified 
parameters.
+   *
+   * @param destTableIdStr the identifier of the destination table as a string
+   * @param serializedDataFiles [from List<DataFiles>] the serialized data 
files to be used for replacing partitions
+   * @param properties the properties containing configuration
+   */
+  public IcebergOverwritePartitionsStep(String destTableIdStr, String 
partitionColName, String partitionValue, byte[] serializedDataFiles, Properties 
properties) {
+    this.destTableIdStr = destTableIdStr;
+    this.partitionColName = partitionColName;
+    this.partitionValue = partitionValue;
+    this.serializedDataFiles = serializedDataFiles;
+    this.properties = properties;
+  }
+
+  @Override
+  public boolean isCompleted() {
+    return false;
+  }
+
+  /**
+   * Executes the partition replacement in the destination Iceberg table.
+   * Also, have retry mechanism as done in {@link 
IcebergRegisterStep#execute()}
+   *
+   * @throws IOException if an I/O error occurs during execution
+   */
+  @Override
+  public void execute() throws IOException {
+    // Unlike IcebergRegisterStep::execute, which validates dest table 
metadata has not changed between copy entity
+    // generation and the post-copy commit, do no such validation here, so 
dest table writes may continue throughout
+    // our copying. any new data written in the meanwhile to THE SAME 
partitions we are about to overwrite will be
+    // clobbered and replaced by the copy entities from our execution.
+    IcebergTable destTable = 
createDestinationCatalog().openTable(TableIdentifier.parse(this.destTableIdStr));
+    List<DataFile> dataFiles = 
SerializationUtil.deserializeFromBytes(this.serializedDataFiles);
+    try {
+      log.info("~{}~ Starting partition overwrite - partition: {}; value: {}; 
numDataFiles: {}; path[0]: {}",
+          this.destTableIdStr,
+          this.partitionColName,
+          this.partitionValue,
+          dataFiles.size(),
+          dataFiles.get(0).path()
+      );
+      Retryer<Void> overwritePartitionsRetryer = 
createOverwritePartitionsRetryer();
+      overwritePartitionsRetryer.call(() -> {
+        destTable.overwritePartition(dataFiles, this.partitionColName, 
this.partitionValue);
+        return null;
+      });
+      log.info("~{}~ Successful partition overwrite - partition: {}; value: 
{}",
+          this.destTableIdStr,
+          this.partitionColName,
+          this.partitionValue
+      );
+    } catch (ExecutionException executionException) {
+      String msg = String.format("~%s~ Failed to overwrite partitions", 
this.destTableIdStr);
+      log.error(msg, executionException);
+      throw new RuntimeException(msg, executionException.getCause());
+    } catch (RetryException retryException) {
+      String interruptedNote = Thread.currentThread().isInterrupted() ? "... 
then interrupted" : "";
+      String msg = String.format("~%s~ Failure attempting to overwrite 
partition [num failures: %d] %s",
+          this.destTableIdStr,
+          retryException.getNumberOfFailedAttempts(),
+          interruptedNote);
+      Throwable informativeException = 
retryException.getLastFailedAttempt().hasException()
+          ? retryException.getLastFailedAttempt().getExceptionCause()
+          : retryException;
+      log.error(msg, informativeException);
+      throw new RuntimeException(msg, informativeException);
+    }
+  }
+
+  protected IcebergCatalog createDestinationCatalog() throws IOException {
+    return IcebergDatasetFinder.createIcebergCatalog(this.properties, 
IcebergDatasetFinder.CatalogLocation.DESTINATION);
+  }
+
+  private Retryer<Void> createOverwritePartitionsRetryer() {
+    Config config = ConfigFactory.parseProperties(this.properties);
+    Config retryerOverridesConfig = 
config.hasPath(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX)
+        ? 
config.getConfig(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX)
+        : ConfigFactory.empty();
+
+    return 
RetryerFactory.newInstance(retryerOverridesConfig.withFallback(RETRYER_FALLBACK_CONFIG),
 Optional.of(new RetryListener() {
+      @Override
+      public <V> void onRetry(Attempt<V> attempt) {
+        if (attempt.hasException()) {
+          String msg = String.format("~%s~ Exception while overwriting 
partitions [attempt: %d; elapsed: %s]",
+              destTableIdStr,
+              attempt.getAttemptNumber(),
+              
Duration.ofMillis(attempt.getDelaySinceFirstAttempt()).toString());
+          log.warn(msg, attempt.getExceptionCause());
+        }
+      }
+    }));
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
new file mode 100644
index 0000000000..42582f09e3
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDataset.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.util.SerializationUtil;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.ImmutableList;
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
+import org.apache.gobblin.data.management.copy.CopyableDataset;
+import org.apache.gobblin.util.function.CheckedExceptionFunction;
+import 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate;
+import 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil;
+
+/**
+ * Iceberg Partition dataset implementing {@link CopyableDataset}
+ * <p>
+ * This class extends {@link IcebergDataset} and provides functionality to 
filter partitions
+ * and generate copy entities for partition based data movement.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDataset extends IcebergDataset {
+  // Currently hardcoded these transforms here but eventually it will depend 
on filter predicate implementation and can
+  // be moved to a common place or inside each filter predicate.
+  private static final List<String> supportedTransforms = 
ImmutableList.of("identity", "truncate");
+  private final Predicate<StructLike> partitionFilterPredicate;
+  private final String partitionColumnName;
+  private final String partitionColValue;
+
+  public IcebergPartitionDataset(IcebergTable srcIcebergTable, IcebergTable 
destIcebergTable, Properties properties,
+      FileSystem sourceFs, boolean shouldIncludeMetadataPath, String 
partitionColumnName, String partitionColValue)
+      throws IOException {
+    super(srcIcebergTable, destIcebergTable, properties, sourceFs, 
shouldIncludeMetadataPath);
+    this.partitionColumnName = partitionColumnName;
+    this.partitionColValue = partitionColValue;
+    this.partitionFilterPredicate = createPartitionFilterPredicate();
+  }
+
+  /**
+   * Generates copy entities for partition based data movement.
+   * It finds files specific to the partition and create destination data 
files based on the source data files.
+   * Also updates the destination data files with destination table write data 
location and add UUID to the file path
+   * to avoid conflicts.
+   *
+   * @param targetFs the target file system
+   * @param copyConfig the copy configuration
+   * @return a collection of copy entities
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, 
CopyConfiguration copyConfig) throws IOException {
+    // TODO: Refactor the IcebergDataset::generateCopyEntities to avoid code 
duplication
+    //  Differences are getting data files, copying ancestor permission and 
adding post publish steps
+    String fileSet = this.getFileSetId();
+    List<CopyEntity> copyEntities = Lists.newArrayList();
+    IcebergTable srcIcebergTable = getSrcIcebergTable();
+    List<DataFile> srcDataFiles = 
srcIcebergTable.getPartitionSpecificDataFiles(this.partitionFilterPredicate);
+    Map<Path, DataFile> destDataFileBySrcPath = 
calcDestDataFileBySrcPath(srcDataFiles);
+    Configuration defaultHadoopConfiguration = new Configuration();
+
+    for (Map.Entry<Path, FileStatus> entry : 
calcSrcFileStatusByDestFilePath(destDataFileBySrcPath).entrySet()) {
+      Path destPath = entry.getKey();
+      FileStatus srcFileStatus = entry.getValue();
+      // TODO: should be the same FS each time; try creating once, reusing 
thereafter, to not recreate wastefully
+      FileSystem actualSourceFs = 
getSourceFileSystemFromFileStatus(srcFileStatus, defaultHadoopConfiguration);
+
+      CopyableFile fileEntity = CopyableFile.fromOriginAndDestination(
+              actualSourceFs, srcFileStatus, targetFs.makeQualified(destPath), 
copyConfig)
+          .fileSet(fileSet)
+          .datasetOutputPath(targetFs.getUri().getPath())
+          .build();
+
+      fileEntity.setSourceData(getSourceDataset(this.sourceFs));
+      fileEntity.setDestinationData(getDestinationDataset(targetFs));
+      copyEntities.add(fileEntity);
+    }
+
+    // Adding this check to avoid adding post publish step when there are no 
files to copy.
+    List<DataFile> destDataFiles = new 
ArrayList<>(destDataFileBySrcPath.values());
+    if (CollectionUtils.isNotEmpty(destDataFiles)) {
+      copyEntities.add(createOverwritePostPublishStep(destDataFiles));
+    }
+
+    log.info("~{}~ generated {} copy entities", fileSet, copyEntities.size());
+    return copyEntities;
+  }
+
+  private Map<Path, DataFile> calcDestDataFileBySrcPath(List<DataFile> 
srcDataFiles)
+      throws IcebergTable.TableNotFoundException {
+    String fileSet = this.getFileSetId();
+    Map<Path, DataFile> destDataFileBySrcPath = Maps.newHashMap();
+    if (srcDataFiles.isEmpty()) {
+      log.warn("~{}~ found no data files for partition col : {} with partition 
value : {} to copy", fileSet,
+          this.partitionColumnName, this.partitionColValue);
+      return destDataFileBySrcPath;
+    }
+    TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
+    TableMetadata destTableMetadata = 
getDestIcebergTable().accessTableMetadata();
+    PartitionSpec partitionSpec = destTableMetadata.spec();
+    // tableMetadata.property(TableProperties.WRITE_DATA_LOCATION, "") returns 
null if the property is not set and
+    // doesn't respect passed default value, so to avoid NPE in .replace() we 
are setting it to empty string.
+    String srcWriteDataLocation = 
Optional.ofNullable(srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION,
+        "")).orElse("");
+    String destWriteDataLocation = 
Optional.ofNullable(destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION,
+        "")).orElse("");
+    if (StringUtils.isEmpty(srcWriteDataLocation) || 
StringUtils.isEmpty(destWriteDataLocation)) {
+      log.warn(
+          "~{}~ Either source or destination table does not have write data 
location : source table write data location : {} , destination table write data 
location : {}",
+          fileSet,
+          srcWriteDataLocation,
+          destWriteDataLocation
+      );
+    }
+    srcDataFiles.forEach(dataFile -> {
+      String srcFilePath = dataFile.path().toString();
+      Path updatedDestFilePath = relocateDestPath(srcFilePath, 
srcWriteDataLocation, destWriteDataLocation);
+      log.debug("~{}~ Path changed from Src : {} to Dest : {}", fileSet, 
srcFilePath, updatedDestFilePath);
+      destDataFileBySrcPath.put(new Path(srcFilePath), 
DataFiles.builder(partitionSpec)
+          .copy(dataFile)
+          .withPath(updatedDestFilePath.toString())
+          .build());
+    });
+    log.info("~{}~ created {} destination data files", fileSet, 
destDataFileBySrcPath.size());
+    return destDataFileBySrcPath;
+  }
+
+  private Path relocateDestPath(String curPathStr, String prefixToBeReplaced, 
String prefixToReplaceWith) {
+    String updPathStr = curPathStr.replace(prefixToBeReplaced, 
prefixToReplaceWith);
+    return addUUIDToPath(updPathStr);
+  }
+
+  private Path addUUIDToPath(String filePathStr) {
+    Path filePath = new Path(filePathStr);
+    String fileDir = filePath.getParent().toString();
+    String fileName = filePath.getName();
+    String newFileName = String.join("-",UUID.randomUUID().toString(), 
fileName);
+    return new Path(fileDir, newFileName);
+  }
+
+  private Map<Path, FileStatus> calcSrcFileStatusByDestFilePath(Map<Path, 
DataFile> destDataFileBySrcPath)
+      throws IOException {
+    Function<Path, FileStatus> getFileStatus = 
CheckedExceptionFunction.wrapToTunneled(this.sourceFs::getFileStatus);
+    Map<Path, FileStatus> srcFileStatusByDestFilePath = Maps.newHashMap();
+    try {
+      srcFileStatusByDestFilePath = destDataFileBySrcPath.entrySet()
+          .stream()
+          .collect(Collectors.toMap(entry -> new 
Path(entry.getValue().path().toString()),
+              entry -> getFileStatus.apply(entry.getKey())));
+    } catch (CheckedExceptionFunction.WrappedIOException wrapper) {
+      wrapper.rethrowWrapped();
+    }
+    return srcFileStatusByDestFilePath;
+  }
+
+  private PostPublishStep createOverwritePostPublishStep(List<DataFile> 
destDataFiles) {
+    byte[] serializedDataFiles = 
SerializationUtil.serializeToBytes(destDataFiles);
+
+    IcebergOverwritePartitionsStep icebergOverwritePartitionStep = new 
IcebergOverwritePartitionsStep(
+        this.getDestIcebergTable().getTableId().toString(),
+        this.partitionColumnName,
+        this.partitionColValue,
+        serializedDataFiles,
+        this.properties
+    );
+
+    return new PostPublishStep(this.getFileSetId(), Maps.newHashMap(), 
icebergOverwritePartitionStep, 0);
+  }
+
+  private Predicate<StructLike> createPartitionFilterPredicate() throws 
IOException {
+    //TODO: Refactor it later using factory or other way to support different 
types of filter predicate
+    // Also take into consideration creation of Expression Filter to be used 
in overwrite api
+    TableMetadata srcTableMetadata = 
getSrcIcebergTable().accessTableMetadata();
+    Optional<Integer> partitionColumnIndexOpt = 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(
+        this.partitionColumnName,
+        srcTableMetadata,
+        supportedTransforms
+    );
+    Preconditions.checkArgument(partitionColumnIndexOpt.isPresent(), 
String.format(
+        "Partition column %s not found in table %s",
+        this.partitionColumnName, this.getFileSetId()));
+    int partitionColumnIndex = partitionColumnIndexOpt.get();
+    return new 
IcebergMatchesAnyPropNamePartitionFilterPredicate(partitionColumnIndex, 
this.partitionColValue);
+  }
+
+}
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
new file mode 100644
index 0000000000..581a265e38
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetFinder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+
+import com.google.common.base.Preconditions;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Finder class for locating and creating partitioned Iceberg datasets.
+ * <p>
+ * This class extends {@link IcebergDatasetFinder} and provides functionality 
to create
+ * {@link IcebergPartitionDataset} instances based on the specified source and 
destination Iceberg catalogs.
+ * </p>
+ */
+@Slf4j
+public class IcebergPartitionDatasetFinder extends IcebergDatasetFinder {
+  public static final String ICEBERG_PARTITION_NAME_KEY = "partition.name";
+  public static final String ICEBERG_PARTITION_VALUE_KEY = "partition.value";
+
+  public IcebergPartitionDatasetFinder(FileSystem sourceFs, Properties 
properties) {
+    super(sourceFs, properties);
+  }
+
+  @Override
+  protected IcebergDataset createSpecificDataset(IcebergTable srcIcebergTable, 
IcebergTable destIcebergTable,
+      Properties properties, FileSystem fs, boolean shouldIncludeMetadataPath) 
throws IOException {
+//    TODO: Add Validator for source and destination tables later
+
+    String partitionColumnName = getLocationQualifiedProperty(properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE,
+        ICEBERG_PARTITION_NAME_KEY);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnName),
+        "Partition column name cannot be empty");
+
+    String partitionColumnValue = getLocationQualifiedProperty(properties, 
IcebergDatasetFinder.CatalogLocation.SOURCE,
+        ICEBERG_PARTITION_VALUE_KEY);
+    Preconditions.checkArgument(StringUtils.isNotEmpty(partitionColumnValue),
+        "Partition value cannot be empty");
+
+    return new IcebergPartitionDataset(srcIcebergTable, destIcebergTable, 
properties, fs,
+        getConfigShouldCopyMetadataPath(properties), partitionColumnName, 
partitionColumnValue);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index e802e10297..5221007cdc 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -20,20 +20,29 @@ package org.apache.gobblin.data.management.copy.iceberg;
 import java.io.IOException;
 import java.net.URI;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestReader;
+import org.apache.iceberg.OverwriteFiles;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -47,6 +56,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.util.measurement.GrowthMilestoneTracker;
 
 import static 
org.apache.gobblin.data.management.copy.iceberg.IcebergSnapshotInfo.ManifestFileInfo;
 
@@ -77,10 +87,11 @@ public class IcebergTable {
   private final String datasetDescriptorPlatform;
   private final TableOperations tableOps;
   private final String catalogUri;
+  private final Table table;
 
   @VisibleForTesting
-  IcebergTable(TableIdentifier tableId, TableOperations tableOps, String 
catalogUri) {
-    this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, 
tableOps, catalogUri);
+  IcebergTable(TableIdentifier tableId, TableOperations tableOps, String 
catalogUri, Table table) {
+    this(tableId, tableId.toString(), DatasetConstants.PLATFORM_ICEBERG, 
tableOps, catalogUri, table);
   }
 
   /** @return metadata info limited to the most recent (current) snapshot */
@@ -217,4 +228,74 @@ public class IcebergTable {
       this.tableOps.commit(dstMetadata, srcMetadata);
     }
   }
+
+  /**
+   * Retrieves a list of data files from the current snapshot that match the 
specified partition filter predicate.
+   *
+   * @param icebergPartitionFilterPredicate the predicate to filter partitions
+   * @return a list of data files that match the partition filter predicate
+   * @throws TableNotFoundException if error occurred while accessing the 
table metadata
+   * @throws RuntimeException if error occurred while reading the manifest file
+   */
+  public List<DataFile> getPartitionSpecificDataFiles(Predicate<StructLike> 
icebergPartitionFilterPredicate)
+      throws IOException {
+    TableMetadata tableMetadata = accessTableMetadata();
+    Snapshot currentSnapshot = tableMetadata.currentSnapshot();
+    long currentSnapshotId = currentSnapshot.snapshotId();
+    List<DataFile> knownDataFiles = new ArrayList<>();
+    GrowthMilestoneTracker growthMilestoneTracker = new 
GrowthMilestoneTracker();
+    //TODO: Add support for deleteManifests as well later
+    // Currently supporting dataManifests only
+    List<ManifestFile> dataManifestFiles = 
currentSnapshot.dataManifests(this.tableOps.io());
+    for (ManifestFile manifestFile : dataManifestFiles) {
+      if (growthMilestoneTracker.isAnotherMilestone(knownDataFiles.size())) {
+        log.info("~{}~ for snapshot '{}' - before manifest-file '{}' '{}' 
total known iceberg datafiles", tableId,
+            currentSnapshotId,
+            manifestFile.path(),
+            knownDataFiles.size()
+        );
+      }
+      try (ManifestReader<DataFile> manifestReader = 
ManifestFiles.read(manifestFile, this.tableOps.io());
+          CloseableIterator<DataFile> dataFiles = manifestReader.iterator()) {
+        dataFiles.forEachRemaining(dataFile -> {
+          if (icebergPartitionFilterPredicate.test(dataFile.partition())) {
+            knownDataFiles.add(dataFile.copy());
+          }
+        });
+      } catch (IOException e) {
+        String errMsg = String.format("~%s~ for snapshot '%d' - Failed to read 
manifest file: %s", tableId,
+            currentSnapshotId, manifestFile.path());
+        log.error(errMsg, e);
+        throw new IOException(errMsg, e);
+      }
+    }
+    return knownDataFiles;
+  }
+
+  /**
+   * Overwrite partition data files in the table for the specified partition 
col name & partition value.
+   * <p>
+   *   Overwrite partition replaces the partition using the expression filter 
provided.
+   * </p>
+   * @param dataFiles the list of data files to replace partitions with
+   * @param partitionColName the partition column name whose data files are to 
be replaced
+   * @param partitionValue  the partition column value on which data files 
will be replaced
+   */
+  protected void overwritePartition(List<DataFile> dataFiles, String 
partitionColName, String partitionValue)
+      throws TableNotFoundException {
+    if (dataFiles.isEmpty()) {
+      return;
+    }
+    log.info("~{}~ SnapshotId before overwrite: {}", tableId, 
accessTableMetadata().currentSnapshot().snapshotId());
+    OverwriteFiles overwriteFiles = this.table.newOverwrite();
+    overwriteFiles.overwriteByRowFilter(Expressions.equal(partitionColName, 
partitionValue));
+    dataFiles.forEach(overwriteFiles::addFile);
+    overwriteFiles.commit();
+    this.tableOps.refresh();
+    // Note : this would only arise in a high-frequency commit scenario, but 
there's no guarantee that the current
+    // snapshot is necessarily the one from the commit just before. another 
writer could have just raced to commit
+    // in between.
+    log.info("~{}~ SnapshotId after overwrite: {}", tableId, 
accessTableMetadata().currentSnapshot().snapshotId());
+  }
+
 }
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java
new file mode 100644
index 0000000000..ee5d6acb28
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicate.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.util.Objects;
+import java.util.function.Predicate;
+
+import org.apache.iceberg.StructLike;
+
+/**
+ * Predicate implementation for filtering Iceberg partitions based on 
specified partition value.
+ * <p>
+ * This class filters partitions by checking if the partition value matches 
the specified partition value.
+ * </p>
+ */
+public class IcebergMatchesAnyPropNamePartitionFilterPredicate implements 
Predicate<StructLike> {
+  private final int partitionColumnIndex;
+  private final String partitionValue;
+
+  /**
+   * Constructs an {@code IcebergMatchesAnyPropNamePartitionFilterPredicate} 
with the specified parameters.
+   *
+   * @param partitionColumnIndex the index of the partition column in 
partition spec
+   * @param partitionValue the partition value used to match
+   */
+  public IcebergMatchesAnyPropNamePartitionFilterPredicate(int 
partitionColumnIndex, String partitionValue) {
+    this.partitionColumnIndex = partitionColumnIndex;
+    this.partitionValue = partitionValue;
+  }
+
+  /**
+   * Check if the partition value matches the specified partition value.
+   *
+   * @param partition the partition to check
+   * @return {@code true} if the partition value matches the specified 
partition value, otherwise {@code false}
+   */
+  @Override
+  public boolean test(StructLike partition) {
+    // Just a cautious check to avoid NPE, ideally partition shouldn't be null 
if table is partitioned
+    if (Objects.isNull(partition)) {
+      return false;
+    }
+
+    Object partitionVal = partition.get(this.partitionColumnIndex, 
Object.class);
+    // Need this check to avoid NPE on partitionVal.toString()
+    if (Objects.isNull(partitionVal)) {
+      return false;
+    }
+
+    return this.partitionValue.equals(partitionVal.toString());
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java
new file mode 100644
index 0000000000..358fc9de1e
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.TableMetadata;
+
+/**
+ * Utility class for creating and managing partition filter predicates for 
Iceberg tables.
+ * <p>
+ * This class provides methods to retrieve the index of a partition column in 
the table metadata
+ * and ensures that the partition transform is supported.
+ * </p>
+ * <p>
+ * Note: This class is not meant to be instantiated.
+ * </p>
+ */
+public class IcebergPartitionFilterPredicateUtil {
+  private IcebergPartitionFilterPredicateUtil() {
+  }
+
+  /**
+   * Retrieves the index of the partition column from the partition spec in 
the table metadata.
+   *
+   * @param partitionColumnName the name of the partition column to find
+   * @param tableMetadata the metadata of the Iceberg table
+   * @param supportedTransforms a list of supported partition transforms
+   * @return the index of the partition column if found, otherwise -1
+   * @throws IllegalArgumentException if the partition transform is not 
supported
+   */
+  public static Optional<Integer> getPartitionColumnIndex(
+      String partitionColumnName,
+      TableMetadata tableMetadata,
+      List<String> supportedTransforms
+  ) throws IOException {
+    List<PartitionField> partitionFields = tableMetadata.spec().fields();
+    for (int idx = 0; idx < partitionFields.size(); idx++) {
+      PartitionField partitionField = partitionFields.get(idx);
+      if (partitionField.name().equals(partitionColumnName)) {
+        String transform = partitionField.transform().toString().toLowerCase();
+        if (!supportedTransforms.contains(transform)) {
+          throw new IOException(
+              String.format(" For ~{%s:%d}~ Partition transform %s is not 
supported. Supported transforms are %s",
+                  partitionColumnName,
+                  idx,
+                  transform,
+                  supportedTransforms));
+        }
+        return Optional.of(idx);
+      }
+    }
+    return Optional.empty();
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
index b9babbc888..a446fbb1a7 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -92,6 +93,7 @@ public class IcebergDatasetTest {
   private static final String MANIFEST_PATH_0 = ROOT_PATH + 
"metadata/manifest.a";
   private static final String MANIFEST_DATA_PATH_0A = ROOT_PATH + "data/p0/a";
   private static final String MANIFEST_DATA_PATH_0B = ROOT_PATH + "data/p0/b";
+  private static final String REGISTER_COMMIT_STEP = 
IcebergRegisterStep.class.getName();
   private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_0 =
       new MockIcebergTable.SnapshotPaths(Optional.of(METADATA_PATH), 
MANIFEST_LIST_PATH_0, Arrays.asList(
           new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_0,
@@ -120,7 +122,10 @@ public class IcebergDatasetTest {
     TableIdentifier tableId = TableIdentifier.of(testDbName, testTblName);
     String qualifiedTableName = "foo_prefix." + tableId.toString();
     String platformName = "Floe";
-    IcebergTable table = new IcebergTable(tableId, qualifiedTableName, 
platformName, Mockito.mock(TableOperations.class), SRC_CATALOG_URI);
+    IcebergTable table = new IcebergTable(tableId, qualifiedTableName, 
platformName,
+        Mockito.mock(TableOperations.class),
+        SRC_CATALOG_URI,
+        Mockito.mock(Table.class));
     FileSystem mockFs = Mockito.mock(FileSystem.class);
     Mockito.when(mockFs.getUri()).thenReturn(SRC_FS_URI);
     DatasetDescriptor expected = new DatasetDescriptor(platformName, 
URI.create(SRC_CATALOG_URI), qualifiedTableName);
@@ -428,17 +433,17 @@ public class IcebergDatasetTest {
     for (CopyEntity copyEntity : copyEntities) {
       String json = copyEntity.toString();
       if (isCopyableFile(json)) {
-        String filepath = 
CopyEntityDeserializer.getFilePathAsStringFromJson(json);
+        String filepath = 
CopyEntityDeserializer.getOriginFilePathAsStringFromJson(json);
         actual.add(filepath);
       } else{
-        verifyPostPublishStep(json);
+        verifyPostPublishStep(json, REGISTER_COMMIT_STEP);
       }
     }
     Assert.assertEquals(actual.size(), expected.size(), "Set" + 
actual.toString() + " vs Set" + expected.toString());
     Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray());
   }
 
-  private static boolean isCopyableFile(String json) {
+  public static boolean isCopyableFile(String json) {
     String objectType = new Gson().fromJson(json, JsonObject.class)
         .getAsJsonPrimitive("object-type")
         .getAsString();
@@ -452,14 +457,14 @@ public class IcebergDatasetTest {
         List<CopyEntityDeserializer.FileOwnerAndPermissions> 
ancestorFileOwnerAndPermissionsList =
             
CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson);
         CopyEntityDeserializer.FileOwnerAndPermissions 
destinationFileOwnerAndPermissions = 
CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson);
-        Path filePath = new 
Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson));
+        Path filePath = new 
Path(CopyEntityDeserializer.getOriginFilePathAsStringFromJson(copyEntityJson));
         FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath);
         verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus);
         // providing path's parent to verify ancestor owner and permissions
         verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList, 
filePath.getParent(),
             expectedPathsAndFileStatuses);
       } else {
-        verifyPostPublishStep(copyEntityJson);
+        verifyPostPublishStep(copyEntityJson, REGISTER_COMMIT_STEP);
       }
     }
   }
@@ -481,8 +486,7 @@ public class IcebergDatasetTest {
     }
   }
 
-  private static void verifyPostPublishStep(String json) {
-    String expectedCommitStep = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergRegisterStep";
+  public static void verifyPostPublishStep(String json, String 
expectedCommitStep) {
     String actualCommitStep = new Gson().fromJson(json, JsonObject.class)
         
.getAsJsonObject("object-data").getAsJsonObject("step").getAsJsonPrimitive("object-type").getAsString();
     Assert.assertEquals(actualCommitStep, expectedCommitStep);
@@ -582,7 +586,7 @@ public class IcebergDatasetTest {
       return fs;
     }
 
-    protected static FileStatus createEmptyFileStatus(String pathString) 
throws IOException {
+    public static FileStatus createEmptyFileStatus(String pathString) throws 
IOException {
       Path path = new Path(pathString);
       FileStatus fileStatus = new FileStatus();
       fileStatus.setPath(path);
@@ -640,7 +644,7 @@ public class IcebergDatasetTest {
     }
   }
 
-  private static class CopyEntityDeserializer {
+  protected static class CopyEntityDeserializer {
 
     @Data
     public static class FileOwnerAndPermissions {
@@ -652,13 +656,20 @@ public class IcebergDatasetTest {
       String otherActionPermission = FsAction.valueOf("READ_WRITE").toString();
     }
 
-    public static String getFilePathAsStringFromJson(String json) {
-      String filepath = new Gson().fromJson(json, JsonObject.class)
-              .getAsJsonObject("object-data")
-              .getAsJsonObject("origin")
-              
.getAsJsonObject("object-data").getAsJsonObject("path").getAsJsonObject("object-data")
-              
.getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString();
-      return filepath;
+    public static String getOriginFilePathAsStringFromJson(String json) {
+      return new Gson().fromJson(json, JsonObject.class)
+          .getAsJsonObject("object-data")
+          .getAsJsonObject("origin")
+          
.getAsJsonObject("object-data").getAsJsonObject("path").getAsJsonObject("object-data")
+          
.getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString();
+    }
+
+    public static String getDestinationFilePathAsStringFromJson(String json) {
+      return new Gson().fromJson(json, JsonObject.class)
+          .getAsJsonObject("object-data")
+          .getAsJsonObject("destination")
+          .getAsJsonObject("object-data")
+          
.getAsJsonObject("uri").getAsJsonPrimitive("object-data").getAsString();
     }
 
     public static List<FileOwnerAndPermissions> 
getAncestorOwnerAndPermissions(String json) {
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
new file mode 100644
index 0000000000..6e273ca2d6
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergOverwritePartitionsStepTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.util.SerializationUtil;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+import static org.apache.gobblin.util.retry.RetryerFactory.RETRY_TIMES;
+
+/** Tests for {@link IcebergOverwritePartitionsStep} */
+public class IcebergOverwritePartitionsStepTest {
+  private final String destTableIdStr = "db.foo";
+  private final String testPartitionColName = "testPartition";
+  private final String testPartitionColValue = "testValue";
+  private IcebergTable mockIcebergTable;
+  private IcebergCatalog mockIcebergCatalog;
+  private Properties mockProperties;
+  private byte[] serializedDummyDataFiles;
+  private IcebergOverwritePartitionsStep spyIcebergOverwritePartitionsStep;
+
+  @BeforeMethod
+  public void setUp() throws IOException {
+    mockIcebergTable = Mockito.mock(IcebergTable.class);
+    mockIcebergCatalog = Mockito.mock(IcebergCatalog.class);
+    mockProperties = new Properties();
+
+    List<DataFile> dummyDataFiles = createDummyDataFiles();
+    serializedDummyDataFiles = 
SerializationUtil.serializeToBytes(dummyDataFiles);
+
+    spyIcebergOverwritePartitionsStep = Mockito.spy(new 
IcebergOverwritePartitionsStep(destTableIdStr,
+        testPartitionColName, testPartitionColValue, serializedDummyDataFiles, 
mockProperties));
+
+    
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
+    
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
+  }
+
+  @Test
+  public void testNeverIsCompleted() {
+    Assert.assertFalse(spyIcebergOverwritePartitionsStep.isCompleted());
+  }
+
+  @Test
+  public void testExecute() {
+    try {
+      
Mockito.doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(),
 Mockito.anyString(),
+          Mockito.anyString());
+      spyIcebergOverwritePartitionsStep.execute();
+      Mockito.verify(mockIcebergTable, 
Mockito.times(1)).overwritePartition(Mockito.anyList(),
+          Mockito.anyString(), Mockito.anyString());
+    } catch (IOException e) {
+      Assert.fail(String.format("Unexpected IOException : %s", e));
+    }
+  }
+
+  @Test
+  public void testExecuteWithRetry() {
+    try {
+      // first call throw exception which will be retried and on second call 
nothing happens
+      Mockito.doThrow(new 
RuntimeException()).doNothing().when(mockIcebergTable).overwritePartition(Mockito.anyList(),
+          Mockito.anyString(), Mockito.anyString());
+      spyIcebergOverwritePartitionsStep.execute();
+      Mockito.verify(mockIcebergTable, 
Mockito.times(2)).overwritePartition(Mockito.anyList(),
+          Mockito.anyString(), Mockito.anyString());
+    } catch (IOException e) {
+      Assert.fail(String.format("Unexpected IOException : %s", e));
+    }
+  }
+
+  @Test
+  public void testExecuteWithDefaultRetry() throws 
IcebergTable.TableNotFoundException {
+    try {
+      // Always throw exception
+      Mockito.doThrow(new 
RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(),
+          Mockito.anyString(), Mockito.anyString());
+      spyIcebergOverwritePartitionsStep.execute();
+    } catch (RuntimeException e) {
+      Mockito.verify(mockIcebergTable, 
Mockito.times(3)).overwritePartition(Mockito.anyList(),
+          Mockito.anyString(), Mockito.anyString());
+      assertRetryTimes(e, 3);
+    } catch (IOException e) {
+      Assert.fail(String.format("Unexpected IOException : %s", e));
+    }
+  }
+
+  @Test
+  public void testExecuteWithCustomRetryConfig() throws IOException {
+    int retryCount = 7;
+    
mockProperties.setProperty(IcebergOverwritePartitionsStep.OVERWRITE_PARTITIONS_RETRYER_CONFIG_PREFIX
 + "." + RETRY_TIMES,
+        Integer.toString(retryCount));
+    spyIcebergOverwritePartitionsStep = Mockito.spy(new 
IcebergOverwritePartitionsStep(destTableIdStr,
+        testPartitionColName, testPartitionColValue, serializedDummyDataFiles, 
mockProperties));
+    
Mockito.when(mockIcebergCatalog.openTable(Mockito.any(TableIdentifier.class))).thenReturn(mockIcebergTable);
+    
Mockito.doReturn(mockIcebergCatalog).when(spyIcebergOverwritePartitionsStep).createDestinationCatalog();
+    try {
+      // Always throw exception
+      Mockito.doThrow(new 
RuntimeException()).when(mockIcebergTable).overwritePartition(Mockito.anyList(),
+          Mockito.anyString(), Mockito.anyString());
+      spyIcebergOverwritePartitionsStep.execute();
+    } catch (RuntimeException e) {
+      Mockito.verify(mockIcebergTable, 
Mockito.times(retryCount)).overwritePartition(Mockito.anyList(),
+          Mockito.anyString(), Mockito.anyString());
+      assertRetryTimes(e, retryCount);
+    } catch (IOException e) {
+      Assert.fail(String.format("Unexpected IOException : %s", e));
+    }
+  }
+
+  private List<DataFile> createDummyDataFiles() {
+    DataFile dataFile1 = DataFiles.builder(PartitionSpec.unpartitioned())
+        .withPath("/path/to/db/foo/data/datafile1.orc")
+        .withFileSizeInBytes(1234)
+        .withRecordCount(100)
+        .build();
+
+    DataFile dataFile2 = DataFiles.builder(PartitionSpec.unpartitioned())
+        .withPath("/path/to/db/foo/data/datafile2.orc")
+        .withFileSizeInBytes(9876)
+        .withRecordCount(50)
+        .build();
+
+    return ImmutableList.of(dataFile1, dataFile2);
+  }
+
+  private void assertRetryTimes(RuntimeException re, Integer retryTimes) {
+    String msg = String.format("~%s~ Failure attempting to overwrite partition 
[num failures: %d]", destTableIdStr, retryTimes);
+    Assert.assertTrue(re.getMessage().startsWith(msg), re.getMessage());
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
new file mode 100644
index 0000000000..7e50197285
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionDatasetTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.data.management.copy.CopyConfiguration;
+import org.apache.gobblin.data.management.copy.CopyEntity;
+import org.apache.gobblin.data.management.copy.CopyContext;
+import org.apache.gobblin.data.management.copy.PreserveAttributes;
+import 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+
+import static org.mockito.ArgumentMatchers.any;
+
+
+/** Tests for {@link 
org.apache.gobblin.data.management.copy.iceberg.IcebergPartitionDataset} */
+public class IcebergPartitionDatasetTest {
+  private IcebergTable srcIcebergTable;
+  private IcebergTable destIcebergTable;
+  private TableMetadata srcTableMetadata;
+  private TableMetadata destTableMetadata;
+  private static FileSystem sourceFs;
+  private static FileSystem targetFs;
+  private IcebergPartitionDataset icebergPartitionDataset;
+  private MockedStatic<IcebergPartitionFilterPredicateUtil> 
icebergPartitionFilterPredicateUtil;
+  private static final String SRC_TEST_DB = "srcTestDB";
+  private static final String SRC_TEST_TABLE = "srcTestTable";
+  private static final String SRC_WRITE_LOCATION = SRC_TEST_DB + "/" + 
SRC_TEST_TABLE + "/data";
+  private static final String DEST_TEST_DB = "destTestDB";
+  private static final String DEST_TEST_TABLE = "destTestTable";
+  private static final String DEST_WRITE_LOCATION = DEST_TEST_DB + "/" + 
DEST_TEST_TABLE + "/data";
+  private static final String TEST_ICEBERG_PARTITION_COLUMN_NAME = 
"testPartition";
+  private static final String TEST_ICEBERG_PARTITION_COLUMN_VALUE = 
"testValue";
+  private static final String OVERWRITE_COMMIT_STEP = 
IcebergOverwritePartitionsStep.class.getName();
+  private final Properties copyConfigProperties = new Properties();
+  private final Properties properties = new Properties();
+  private static final URI SRC_FS_URI;
+  private static final URI DEST_FS_URI;
+
+  static {
+    try {
+      SRC_FS_URI = new URI("abc", "the.source.org", "/", null);
+      DEST_FS_URI = new URI("xyz", "the.dest.org", "/", null);
+    } catch (URISyntaxException e) {
+      throw new RuntimeException("should not occur!", e);
+    }
+  }
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    setupSrcFileSystem();
+    setupDestFileSystem();
+
+    TableIdentifier tableIdentifier = TableIdentifier.of(SRC_TEST_DB, 
SRC_TEST_TABLE);
+
+    srcIcebergTable = Mockito.mock(IcebergTable.class);
+    destIcebergTable = Mockito.mock(IcebergTable.class);
+
+    srcTableMetadata = Mockito.mock(TableMetadata.class);
+    destTableMetadata = Mockito.mock(TableMetadata.class);
+    
Mockito.when(destTableMetadata.spec()).thenReturn(Mockito.mock(PartitionSpec.class));
+
+    Mockito.when(srcIcebergTable.getTableId()).thenReturn(tableIdentifier);
+    Mockito.when(destIcebergTable.getTableId()).thenReturn(tableIdentifier);
+    
Mockito.when(srcIcebergTable.accessTableMetadata()).thenReturn(srcTableMetadata);
+    
Mockito.when(destIcebergTable.accessTableMetadata()).thenReturn(destTableMetadata);
+    
Mockito.when(srcIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class));
+    
Mockito.when(destIcebergTable.getDatasetDescriptor(Mockito.any())).thenReturn(Mockito.mock(DatasetDescriptor.class));
+
+    icebergPartitionFilterPredicateUtil = 
Mockito.mockStatic(IcebergPartitionFilterPredicateUtil.class);
+    icebergPartitionFilterPredicateUtil
+        .when(() -> 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex(Mockito.anyString(),
 Mockito.any(), Mockito.any()))
+        .thenReturn(Optional.of(0));
+
+    copyConfigProperties.setProperty("data.publisher.final.dir", "/test");
+  }
+
+  @AfterMethod
+  public void cleanUp() {
+    icebergPartitionFilterPredicateUtil.close();
+  }
+
+  @Test
+  public void testGenerateCopyEntities() throws IOException {
+    List<String> srcFilePaths = new ArrayList<>();
+    srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc");
+    Map<String, DataFile> mockDataFilesBySrcPath = 
createDataFileMocksBySrcPath(srcFilePaths);
+    
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(
+        new ArrayList<>(mockDataFilesBySrcPath.values()));
+
+    icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, 
destIcebergTable, properties, sourceFs,
+        true);
+
+    CopyConfiguration copyConfiguration =
+        CopyConfiguration.builder(targetFs, 
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
+            .copyContext(new CopyContext()).build();
+
+    Collection<CopyEntity> copyEntities = 
icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);
+
+    verifyCopyEntities(copyEntities, new 
ArrayList<>(mockDataFilesBySrcPath.keySet()), true);
+  }
+
+  @Test
+  public void testGenerateCopyEntitiesWithEmptyDataFiles() throws IOException {
+    List<DataFile> srcDataFiles = Lists.newArrayList();
+    
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(srcDataFiles);
+
+    icebergPartitionDataset = new IcebergPartitionDataset(srcIcebergTable, 
destIcebergTable, properties, sourceFs,
+        true, TEST_ICEBERG_PARTITION_COLUMN_NAME, 
TEST_ICEBERG_PARTITION_COLUMN_VALUE);
+    Collection<CopyEntity> copyEntities = 
icebergPartitionDataset.generateCopyEntities(targetFs,
+        Mockito.mock(CopyConfiguration.class));
+
+    // Since No data files are present, no copy entities should be generated
+    verifyCopyEntities(copyEntities, Collections.emptyList(), true);
+  }
+
+  @Test
+  public void testMultipleCopyEntitiesGenerated() throws IOException {
+    List<String> srcFilePaths = new ArrayList<>();
+    srcFilePaths.add(SRC_WRITE_LOCATION + "/file1.orc");
+    srcFilePaths.add(SRC_WRITE_LOCATION + "/file2.orc");
+    srcFilePaths.add(SRC_WRITE_LOCATION + "/file3.orc");
+    srcFilePaths.add(SRC_WRITE_LOCATION + "/file4.orc");
+    srcFilePaths.add(SRC_WRITE_LOCATION + "/file5.orc");
+
+    Map<String, DataFile> mockDataFilesBySrcPath = 
createDataFileMocksBySrcPath(srcFilePaths);
+    
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(
+        new ArrayList<>(mockDataFilesBySrcPath.values()));
+
+    icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, 
destIcebergTable, properties, sourceFs,
+        true);
+
+    CopyConfiguration copyConfiguration =
+        CopyConfiguration.builder(targetFs, 
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
+            .copyContext(new CopyContext()).build();
+
+    Collection<CopyEntity> copyEntities = 
icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);
+
+    verifyCopyEntities(copyEntities, new 
ArrayList<>(mockDataFilesBySrcPath.keySet()), true);
+  }
+
+  @Test
+  public void testWithDifferentSrcAndDestTableWriteLocation() throws 
IOException {
+    List<String> srcFilePaths = new ArrayList<>();
+    srcFilePaths.add(SRC_WRITE_LOCATION + "/randomFile--Name.orc");
+    
Mockito.when(srcTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, 
"")).thenReturn(SRC_WRITE_LOCATION);
+    
Mockito.when(destTableMetadata.property(TableProperties.WRITE_DATA_LOCATION, 
"")).thenReturn(DEST_WRITE_LOCATION);
+
+    Map<String, DataFile> mockDataFilesBySrcPath = 
createDataFileMocksBySrcPath(srcFilePaths);
+    
Mockito.when(srcIcebergTable.getPartitionSpecificDataFiles(Mockito.any())).thenReturn(
+        new ArrayList<>(mockDataFilesBySrcPath.values()));
+
+    icebergPartitionDataset = new TestIcebergPartitionDataset(srcIcebergTable, 
destIcebergTable, properties, sourceFs,
+        true);
+
+    CopyConfiguration copyConfiguration =
+        CopyConfiguration.builder(targetFs, 
copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString(""))
+            .copyContext(new CopyContext()).build();
+
+    List<CopyEntity> copyEntities =
+        (List<CopyEntity>) 
icebergPartitionDataset.generateCopyEntities(targetFs, copyConfiguration);
+
+    verifyCopyEntities(copyEntities, new 
ArrayList<>(mockDataFilesBySrcPath.keySet()), false);
+  }
+
+  private static void setupSrcFileSystem() throws IOException {
+    sourceFs = Mockito.mock(FileSystem.class);
+    Mockito.when(sourceFs.getUri()).thenReturn(SRC_FS_URI);
+    Mockito.when(sourceFs.makeQualified(any(Path.class)))
+        .thenAnswer(invocation -> invocation.getArgument(0, 
Path.class).makeQualified(SRC_FS_URI, new Path("/")));
+    
Mockito.when(sourceFs.getFileStatus(any(Path.class))).thenAnswer(invocation -> {
+      Path path = invocation.getArgument(0, Path.class);
+      Path qualifiedPath = sourceFs.makeQualified(path);
+      return 
IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath.toString());
+    });
+  }
+
+  private static void setupDestFileSystem() throws IOException {
+    targetFs = Mockito.mock(FileSystem.class);
+    Mockito.when(targetFs.getUri()).thenReturn(DEST_FS_URI);
+    Mockito.when(targetFs.makeQualified(any(Path.class)))
+        .thenAnswer(invocation -> invocation.getArgument(0, 
Path.class).makeQualified(DEST_FS_URI, new Path("/")));
+    // Since we are adding UUID to the file name for every file while creating 
destination path,
+    // so return file not found exception if trying to find file status on 
destination file system
+    Mockito.when(targetFs.getFileStatus(any(Path.class))).thenThrow(new 
FileNotFoundException());
+  }
+
+  private static Map<String, DataFile> 
createDataFileMocksBySrcPath(List<String> srcFilePaths) throws IOException {
+    Map<String, DataFile> dataFileMocksBySrcPath = new HashMap<>();
+    for (String srcFilePath : srcFilePaths) {
+      DataFile dataFile = Mockito.mock(DataFile.class);
+      Path dataFilePath = new Path(srcFilePath);
+      String qualifiedPath = sourceFs.makeQualified(dataFilePath).toString();
+      Mockito.when(dataFile.path()).thenReturn(dataFilePath.toString());
+      
Mockito.when(sourceFs.getFileStatus(Mockito.eq(dataFilePath))).thenReturn(
+          
IcebergDatasetTest.MockFileSystemBuilder.createEmptyFileStatus(qualifiedPath));
+      dataFileMocksBySrcPath.put(qualifiedPath, dataFile);
+    }
+    return dataFileMocksBySrcPath;
+  }
+
+  private static void verifyCopyEntities(Collection<CopyEntity> copyEntities, 
List<String> expectedSrcFilePaths,
+      boolean sameSrcAndDestWriteLocation) {
+    List<String> actualSrcFilePaths = new ArrayList<>();
+    String srcWriteLocationStart = SRC_FS_URI + SRC_WRITE_LOCATION;
+    String destWriteLocationStart = DEST_FS_URI + (sameSrcAndDestWriteLocation 
? SRC_WRITE_LOCATION : DEST_WRITE_LOCATION);
+    String srcErrorMsg = String.format("Source Location should start with %s", 
srcWriteLocationStart);
+    String destErrorMsg = String.format("Destination Location should start 
with %s", destWriteLocationStart);
+    for (CopyEntity copyEntity : copyEntities) {
+      String json = copyEntity.toString();
+      if (IcebergDatasetTest.isCopyableFile(json)) {
+        String originFilepath = 
IcebergDatasetTest.CopyEntityDeserializer.getOriginFilePathAsStringFromJson(json);
+        actualSrcFilePaths.add(originFilepath);
+        String destFilepath = 
IcebergDatasetTest.CopyEntityDeserializer.getDestinationFilePathAsStringFromJson(json);
+        Assert.assertTrue(originFilepath.startsWith(srcWriteLocationStart), 
srcErrorMsg);
+        Assert.assertTrue(destFilepath.startsWith(destWriteLocationStart), 
destErrorMsg);
+        String originFileName = 
originFilepath.substring(srcWriteLocationStart.length() + 1);
+        String destFileName = 
destFilepath.substring(destWriteLocationStart.length() + 1);
+        Assert.assertTrue(destFileName.endsWith(originFileName), "Incorrect 
file name in destination path");
+        Assert.assertTrue(destFileName.length() > originFileName.length() + 1,
+            "Destination file name should be longer than source file name as 
UUID is appended");
+      } else{
+        IcebergDatasetTest.verifyPostPublishStep(json, OVERWRITE_COMMIT_STEP);
+      }
+    }
+    Assert.assertEquals(actualSrcFilePaths.size(), expectedSrcFilePaths.size(),
+        "Set" + actualSrcFilePaths + " vs Set" + expectedSrcFilePaths);
+    Assert.assertEqualsNoOrder(actualSrcFilePaths.toArray(), 
expectedSrcFilePaths.toArray());
+  }
+
+  /**
+   * See {@link 
org.apache.gobblin.data.management.copy.iceberg.IcebergDatasetTest.TrickIcebergDataset}
+   * */
+  protected static class TestIcebergPartitionDataset extends 
IcebergPartitionDataset {
+
+    public TestIcebergPartitionDataset(IcebergTable srcIcebergTable, 
IcebergTable destIcebergTable,
+        Properties properties, FileSystem sourceFs, boolean 
shouldIncludeMetadataPath) throws IOException {
+      super(srcIcebergTable, destIcebergTable, properties, sourceFs, 
shouldIncludeMetadataPath,
+          TEST_ICEBERG_PARTITION_COLUMN_NAME, 
TEST_ICEBERG_PARTITION_COLUMN_VALUE);
+    }
+
+    @Override
+    protected FileSystem getSourceFileSystemFromFileStatus(FileStatus 
fileStatus, Configuration hadoopConfig) {
+      return this.sourceFs;
+    }
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index a1a29444ed..63aa27221b 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -21,11 +21,13 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -33,8 +35,11 @@ import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionData;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.avro.AvroSchemaUtil;
@@ -61,7 +66,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
           .fields()
           .name("id")
           .type()
-          .longType()
+          .stringType()
           .noDefault()
           .endRecord();
   protected static final Schema icebergSchema = 
AvroSchemaUtil.toIceberg(avroDataSchema);
@@ -71,6 +76,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
 
   private final String dbName = "myicebergdb";
   private final String tableName = "justtesting";
+  private final String destTableName = "destTable";
   private TableIdentifier tableId;
   private Table table;
   private String catalogUri;
@@ -85,7 +91,7 @@ public class IcebergTableTest extends HiveMetastoreTest {
   @BeforeMethod
   public void setUpEachTest() {
     tableId = TableIdentifier.of(dbName, tableName);
-    table = catalog.createTable(tableId, icebergSchema);
+    table = catalog.createTable(tableId, icebergSchema, icebergPartitionSpec);
     catalogUri = catalog.getConf().get(CatalogProperties.URI);
     metadataBasePath = calcMetadataBasePath(tableId);
   }
@@ -106,7 +112,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
     );
 
     initializeSnapshots(table, perSnapshotFilesets);
-    IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, 
catalog.newTableOps(tableId), catalogUri).getCurrentSnapshotInfo();
+    IcebergSnapshotInfo snapshotInfo = new IcebergTable(tableId, 
catalog.newTableOps(tableId), catalogUri,
+        catalog.loadTable(tableId)).getCurrentSnapshotInfo();
     verifySnapshotInfo(snapshotInfo, perSnapshotFilesets, 
perSnapshotFilesets.size());
   }
 
@@ -114,7 +121,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
   @Test(expectedExceptions = IcebergTable.TableNotFoundException.class)
   public void testGetCurrentSnapshotInfoOnBogusTable() throws IOException {
     TableIdentifier bogusTableId = TableIdentifier.of(dbName, tableName + 
"_BOGUS");
-    IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, 
catalog.newTableOps(bogusTableId), catalogUri).getCurrentSnapshotInfo();
+    IcebergSnapshotInfo snapshotInfo = new IcebergTable(bogusTableId, 
catalog.newTableOps(bogusTableId), catalogUri,
+        null).getCurrentSnapshotInfo();
     Assert.fail("expected an exception when using table ID '" + bogusTableId + 
"'");
   }
 
@@ -129,7 +137,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
     );
 
     initializeSnapshots(table, perSnapshotFilesets);
-    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new 
IcebergTable(tableId, catalog.newTableOps(tableId), 
catalogUri).getAllSnapshotInfosIterator());
+    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new 
IcebergTable(tableId, catalog.newTableOps(tableId),
+        catalogUri, catalog.loadTable(tableId)).getAllSnapshotInfosIterator());
     Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num 
snapshots");
 
     for (int i = 0; i < snapshotInfos.size(); ++i) {
@@ -149,7 +158,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
     );
 
     initializeSnapshots(table, perSnapshotFilesets);
-    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new 
IcebergTable(tableId, catalog.newTableOps(tableId), 
catalogUri).getIncrementalSnapshotInfosIterator());
+    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new 
IcebergTable(tableId, catalog.newTableOps(tableId),
+        catalogUri, 
catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
     Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num 
snapshots");
 
     for (int i = 0; i < snapshotInfos.size(); ++i) {
@@ -169,7 +179,8 @@ public class IcebergTableTest extends HiveMetastoreTest {
     );
 
     initializeSnapshots(table, perSnapshotFilesets);
-    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new 
IcebergTable(tableId, catalog.newTableOps(tableId), 
catalogUri).getIncrementalSnapshotInfosIterator());
+    List<IcebergSnapshotInfo> snapshotInfos = Lists.newArrayList(new 
IcebergTable(tableId, catalog.newTableOps(tableId),
+        catalogUri, 
catalog.loadTable(tableId)).getIncrementalSnapshotInfosIterator());
     Assert.assertEquals(snapshotInfos.size(), perSnapshotFilesets.size(), "num 
snapshots");
 
     for (int i = 0; i < snapshotInfos.size(); ++i) {
@@ -197,10 +208,11 @@ public class IcebergTableTest extends HiveMetastoreTest {
     // Expect existing property values to be deleted if it does not exist on 
the source
     destTableProperties.put("deletedTableProperty", 
"deletedTablePropertyValue");
 
-    TableIdentifier destTableId = TableIdentifier.of(dbName, "destTable");
+    TableIdentifier destTableId = TableIdentifier.of(dbName, destTableName);
     catalog.createTable(destTableId, icebergSchema, null, destTableProperties);
 
-    IcebergTable destIcebergTable = new IcebergTable(destTableId, 
catalog.newTableOps(destTableId), catalogUri);
+    IcebergTable destIcebergTable = new IcebergTable(destTableId, 
catalog.newTableOps(destTableId), catalogUri,
+        catalog.loadTable(destTableId));
     // Mock a source table with the same table UUID copying new properties
     TableMetadata newSourceTableProperties = 
destIcebergTable.accessTableMetadata().replaceProperties(srcTableProperties);
 
@@ -209,6 +221,86 @@ public class IcebergTableTest extends HiveMetastoreTest {
     
Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().get("newKey"),
 "newValue");
     
Assert.assertEquals(destIcebergTable.accessTableMetadata().properties().get("testKey"),
 "testValueNew");
     
Assert.assertNull(destIcebergTable.accessTableMetadata().properties().get("deletedTableProperty"));
+
+    catalog.dropTable(destTableId);
+  }
+
+  /** Verify that getPartitionSpecificDataFiles return datafiles belonging to 
the partition defined by predicate */
+  @Test
+  public void testGetPartitionSpecificDataFiles() throws IOException {
+    // Note - any specific file path format is not mandatory to be mapped to 
specific partition
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/file3.orc",
+        "/path/tableName/data/id=2/file5.orc",
+        "/path/tableName/data/file4.orc",
+        "/path/tableName/data/id=3/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partitionData = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partitionData.set(0, "1");
+
+    addPartitionDataFiles(table, 
createDataFiles(paths.stream().collect(Collectors.toMap(Function.identity(), v 
-> partitionData))));
+
+    IcebergTable icebergTable = new IcebergTable(tableId,
+        catalog.newTableOps(tableId),
+        catalogUri,
+        catalog.loadTable(tableId));
+    // Using AlwaysTrue & AlwaysFalse Predicate to avoid mocking of predicate 
class
+    Predicate<StructLike> alwaysTruePredicate = partition -> true;
+    Predicate<StructLike> alwaysFalsePredicate = partition -> false;
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysTruePredicate).size(),
 5);
+    
Assert.assertEquals(icebergTable.getPartitionSpecificDataFiles(alwaysFalsePredicate).size(),
 0);
+  }
+
+  /** Verify that overwritePartition replace data files belonging to given 
partition col and value */
+  @Test
+  public void testOverwritePartition() throws IOException {
+    // Note - any specific file path format is not mandatory to be mapped to 
specific partition
+    List<String> paths = Arrays.asList(
+        "/path/tableName/data/id=1/file1.orc",
+        "/path/tableName/data/file2.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partition1Data = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partition1Data.set(0, "1");
+
+    addPartitionDataFiles(table, 
createDataFiles(paths.stream().collect(Collectors.toMap(Function.identity(), v 
-> partition1Data))));
+
+    IcebergTable icebergTable = new IcebergTable(tableId,
+        catalog.newTableOps(tableId),
+        catalogUri,
+        catalog.loadTable(tableId));
+
+    verifyAnyOrder(paths, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    List<String> paths2 = Arrays.asList(
+        "/path/tableName/data/file3.orc",
+        "/path/tableName/data/id=2/file4.orc"
+    );
+    // Using the schema defined in start of this class
+    PartitionData partition2Data = new 
PartitionData(icebergPartitionSpec.partitionType());
+    partition2Data.set(0, "2");
+
+    List<DataFile> partition2DataFiles = 
createDataFiles(paths2.stream().collect(Collectors.toMap(Function.identity(), v 
-> partition2Data)));
+    // here, since partition data with value 2 doesn't exist yet,
+    // we expect it to get added to the table, w/o changing or deleting any 
other partitions
+    icebergTable.overwritePartition(partition2DataFiles, "id", "2");
+    List<String> expectedPaths2 = new ArrayList<>(paths);
+    expectedPaths2.addAll(paths2);
+    verifyAnyOrder(expectedPaths2, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
+
+    List<String> paths3 = Arrays.asList(
+        "/path/tableName/data/id=2/file5.orc",
+        "/path/tableName/data/file6.orc"
+    );
+    // Reusing same partition data to create data file with different paths
+    List<DataFile> partition1NewDataFiles = 
createDataFiles(paths3.stream().collect(Collectors.toMap(Function.identity(), v 
-> partition1Data)));
+    // here, since partition data with value 1 already exists, we expect it to 
get updated in the table with newer path
+    icebergTable.overwritePartition(partition1NewDataFiles, "id", "1");
+    List<String> expectedPaths3 = new ArrayList<>(paths2);
+    expectedPaths3.addAll(paths3);
+    verifyAnyOrder(expectedPaths3, 
icebergTable.getCurrentSnapshotInfo().getAllDataFilePaths(), "data filepaths 
should match");
   }
 
   /** full validation for a particular {@link IcebergSnapshotInfo} */
@@ -333,4 +425,25 @@ public class IcebergTableTest extends HiveMetastoreTest {
   protected static <T, C extends Collection<T>> List<T> flatten(Collection<C> 
cc) {
     return cc.stream().flatMap(x -> x.stream()).collect(Collectors.toList());
   }
+
+  private static void addPartitionDataFiles(Table table, List<DataFile> 
dataFiles) {
+    dataFiles.forEach(dataFile -> 
table.newAppend().appendFile(dataFile).commit());
+  }
+
+  private static List<DataFile> createDataFiles(Map<String, PartitionData> 
pathWithPartitionData) {
+    return pathWithPartitionData.entrySet().stream()
+        .map(e -> createDataFileWithPartition(e.getKey(), e.getValue()))
+        .collect(Collectors.toList());
+  }
+
+  private static DataFile createDataFileWithPartition(String path, 
PartitionData partitionData) {
+    return DataFiles.builder(icebergPartitionSpec)
+        .withPath(path)
+        .withFileSizeInBytes(8)
+        .withRecordCount(1)
+        .withPartition(partitionData)
+        .withFormat(FileFormat.ORC)
+        .build();
+  }
+
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java
new file mode 100644
index 0000000000..4eb16500e6
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergMatchesAnyPropNamePartitionFilterPredicateTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import org.apache.iceberg.StructLike;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/** Tests for {@link 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergMatchesAnyPropNamePartitionFilterPredicate}
 */
+public class IcebergMatchesAnyPropNamePartitionFilterPredicateTest {
+  private static final String TEST_PARTITION_VALUE_1 = "value1";
+  private IcebergMatchesAnyPropNamePartitionFilterPredicate predicate;
+
+  @BeforeMethod
+  public void setup() {
+    predicate = new IcebergMatchesAnyPropNamePartitionFilterPredicate(0, 
TEST_PARTITION_VALUE_1);
+  }
+
+  @Test
+  public void testPartitionValueNULL() {
+    // Just mocking, so that the partition value is NULL
+    Assert.assertFalse(predicate.test(Mockito.mock(StructLike.class)));
+  }
+
+  @Test
+  public void testWhenPartitionIsNull() {
+    Assert.assertFalse(predicate.test(null));
+  }
+
+  @Test
+  public void testPartitionValueMatch() {
+    StructLike mockPartition = Mockito.mock(StructLike.class);
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(Object.class))).thenReturn("value1");
+    Assert.assertTrue(predicate.test(mockPartition));
+  }
+
+  @Test
+  public void testPartitionValueDoesNotMatch() {
+    StructLike mockPartition = Mockito.mock(StructLike.class);
+    Mockito.when(mockPartition.get(Mockito.anyInt(), 
Mockito.eq(Object.class))).thenReturn("<invalid_value>");
+    Assert.assertFalse(predicate.test(mockPartition));
+  }
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java
new file mode 100644
index 0000000000..9743b5ab62
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/predicates/IcebergPartitionFilterPredicateUtilTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg.predicates;
+
+import java.io.IOException;
+import java.util.List;
+
+import java.util.Optional;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.transforms.Transform;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+
+/** Tests for {@link 
org.apache.gobblin.data.management.copy.iceberg.predicates.IcebergPartitionFilterPredicateUtil}
 */
+public class IcebergPartitionFilterPredicateUtilTest {
+  private static TableMetadata mockTableMetadata;
+  private final List<String> supportedTransforms = 
ImmutableList.of("supported1", "supported2");
+
+  @Test
+  public void testPartitionTransformNotSupported() {
+    setupMockData("col1", "unsupported");
+    IOException exception = Assert.expectThrows(IOException.class, () -> {
+      IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col1", 
mockTableMetadata, supportedTransforms);
+    });
+    Assert.assertTrue(exception.getMessage().contains("Partition transform 
unsupported is not supported. Supported transforms are [supported1, 
supported2]"));
+  }
+
+  @Test
+  public void testPartitionTransformSupported() throws IOException {
+    setupMockData("col1", "supported1");
+    int result =
+        IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col1", 
mockTableMetadata, supportedTransforms)
+            .get();
+    Assert.assertEquals(result, 0);
+  }
+
+  @Test
+  public void testPartitionColumnNotFound() throws IOException {
+    setupMockData("col", "supported1");
+    Optional<Integer> result = 
IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col2",
+        mockTableMetadata, supportedTransforms);
+    Assert.assertFalse(result.isPresent());
+  }
+
+  @Test
+  public void testPartitionColumnFoundIndex1() throws IOException {
+    mockTableMetadata = Mockito.mock(TableMetadata.class);
+    PartitionSpec mockPartitionSpec = Mockito.mock(PartitionSpec.class);
+    PartitionField mockPartitionField1 = Mockito.mock(PartitionField.class);
+    PartitionField mockPartitionField2 = Mockito.mock(PartitionField.class);
+    Transform mockTransform1 = Mockito.mock(Transform.class);
+    Transform mockTransform2 = Mockito.mock(Transform.class);
+
+    List<PartitionField> partitionFields = 
ImmutableList.of(mockPartitionField1, mockPartitionField2);
+
+    Mockito.when(mockTableMetadata.spec()).thenReturn(mockPartitionSpec);
+    Mockito.when(mockPartitionSpec.fields()).thenReturn(partitionFields);
+    Mockito.when(mockPartitionField1.name()).thenReturn("col1");
+    Mockito.when(mockPartitionField1.transform()).thenReturn(mockTransform1);
+    Mockito.when(mockTransform1.toString()).thenReturn("supported1");
+    Mockito.when(mockPartitionField2.name()).thenReturn("col2");
+    Mockito.when(mockPartitionField2.transform()).thenReturn(mockTransform2);
+    Mockito.when(mockTransform2.toString()).thenReturn("supported2");
+
+    int result =
+        IcebergPartitionFilterPredicateUtil.getPartitionColumnIndex("col2", 
mockTableMetadata, supportedTransforms)
+            .get();
+    Assert.assertEquals(result, 1);
+  }
+
+  private static void setupMockData(String name, String transform) {
+    mockTableMetadata = Mockito.mock(TableMetadata.class);
+
+    PartitionSpec mockPartitionSpec = Mockito.mock(PartitionSpec.class);
+    PartitionField mockPartitionField = Mockito.mock(PartitionField.class);
+    Transform mockTransform = Mockito.mock(Transform.class);
+
+    List<PartitionField> partitionFields = 
ImmutableList.of(mockPartitionField);
+
+    Mockito.when(mockTableMetadata.spec()).thenReturn(mockPartitionSpec);
+    Mockito.when(mockPartitionSpec.fields()).thenReturn(partitionFields);
+    Mockito.when(mockPartitionField.name()).thenReturn(name);
+    Mockito.when(mockPartitionField.transform()).thenReturn(mockTransform);
+    Mockito.when(mockTransform.toString()).thenReturn(transform);
+  }
+}
\ No newline at end of file


Reply via email to