This is an automated email from the ASF dual-hosted git repository.

vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new e98452a049 [GOBBLIN-2231] Implement IcebergSource to enable copying 
Iceberg data files to any dest (#4146)
e98452a049 is described below

commit e98452a0495e38e029b056d8953ba3ab417ae7af
Author: Prateek Khandelwal <[email protected]>
AuthorDate: Mon Nov 10 22:59:47 2025 +0530

    [GOBBLIN-2231] Implement IcebergSource to enable copying Iceberg data files 
to any dest (#4146)
    
    * Implement IcebergSource to enable copying Iceberg data files to any dest
    
    * Introduce partition-aware discovery & lookback period based copy for 
IcebergSource
    
    * Added unit tests for IcebergSource
    
    * Added unit tests for IcebergFileStreamHelper
    
    * Add FilePathWithPartition class in IcebergTable
    
    * Address review comments
    
    * Fix indentation
    
    * Handling for yyyy-MM-dd-00 partition format
    
    * Remove unused import
    
    * Address review comment
    
    * Fix core tests error
    
    * Fix failing test
    
    * Fix failing test
---
 .../copy/iceberg/IcebergFileStreamExtractor.java   |   58 +
 .../copy/iceberg/IcebergFileStreamHelper.java      |  140 +++
 .../management/copy/iceberg/IcebergSource.java     |  604 +++++++++++
 .../data/management/copy/iceberg/IcebergTable.java |  125 +++
 .../copy/iceberg/IcebergFileStreamHelperTest.java  |  302 ++++++
 .../management/copy/iceberg/IcebergSourceTest.java | 1116 ++++++++++++++++++++
 .../management/copy/iceberg/IcebergTableTest.java  |    7 +-
 7 files changed, 2351 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
new file mode 100644
index 0000000000..e87b187d74
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.lang3.NotImplementedException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.source.extractor.filebased.FileBasedExtractor;
+
+/**
+ * Extractor for file streaming mode that creates FileAwareInputStream for 
each file.
+ * 
+ * This extractor is used when {@code iceberg.record.processing.enabled=false} 
to stream
+ * Iceberg table files as binary data to destinations like Azure, HDFS
+ * 
+ * Each "record" is a {@link FileAwareInputStream} representing one file from
+ * the Iceberg table. The downstream writer handles streaming the file content.
+ */
+@Slf4j
+public class IcebergFileStreamExtractor extends FileBasedExtractor<String, 
FileAwareInputStream> {
+
+  public IcebergFileStreamExtractor(WorkUnitState workUnitState) throws 
IOException {
+    super(workUnitState, new IcebergFileStreamHelper(workUnitState));
+  }
+
+  @Override
+  public String getSchema() {
+    // For file streaming, schema is not used by IdentityConverter; returning 
a constant
+    return "FileAwareInputStream";
+  }
+
+  @Override
+  public Iterator<FileAwareInputStream> downloadFile(String filePath) throws 
IOException {
+    throw new NotImplementedException("Not yet implemented");
+  }
+
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
new file mode 100644
index 0000000000..ebe69ef1ce
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+import 
org.apache.gobblin.source.extractor.filebased.TimestampAwareFileBasedHelper;
+
+/**
+ * File-based helper for Iceberg file streaming operations.
+ *
+ * This helper supports file streaming mode where Iceberg table files
+ * are streamed as binary data without record-level processing.
+ */
+@Slf4j
+public class IcebergFileStreamHelper implements TimestampAwareFileBasedHelper {
+
+  private final State state;
+  private final Configuration configuration;
+  private FileSystem fileSystem;
+
+  public IcebergFileStreamHelper(State state) {
+    this.state = state;
+    this.configuration = new Configuration();
+
+    // Add any Hadoop configuration from job properties
+    for (String key : state.getPropertyNames()) {
+      if (key.startsWith("fs.") || key.startsWith("hadoop.")) {
+        configuration.set(key, state.getProp(key));
+      }
+    }
+  }
+
+  @Override
+  public void connect() throws FileBasedHelperException {
+    try {
+      this.fileSystem = FileSystem.get(configuration);
+      log.info("Connected to Iceberg file stream helper with FileSystem: {}", 
fileSystem.getClass().getSimpleName());
+    } catch (IOException e) {
+      throw new FileBasedHelperException("Failed to initialize FileSystem for 
Iceberg file streaming", e);
+    }
+  }
+
+  @Override
+  public List<String> ls(String path) throws FileBasedHelperException {
+    try {
+      // For Iceberg, file discovery is handled by IcebergSource
+      // This method returns files from work unit configuration
+      List<String> filesToPull = 
state.getPropAsList(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, "");
+      log.debug("Returning {} files for processing", filesToPull.size());
+      return filesToPull;
+    } catch (Exception e) {
+      throw new FileBasedHelperException("Failed to list files", e);
+    }
+  }
+
+  @Override
+  public InputStream getFileStream(String filePath) throws 
FileBasedHelperException {
+    try {
+      Path path = new Path(filePath);
+      FileSystem fs = getFileSystemForPath(path);
+      return fs.open(path);
+    } catch (IOException e) {
+      throw new FileBasedHelperException("Failed to get file stream for: " + 
filePath, e);
+    }
+  }
+
+  @Override
+  public long getFileSize(String filePath) throws FileBasedHelperException {
+    try {
+      Path path = new Path(filePath);
+      FileSystem fs = getFileSystemForPath(path);
+      return fs.getFileStatus(path).getLen();
+    } catch (IOException e) {
+      throw new FileBasedHelperException("Failed to get file size for: " + 
filePath, e);
+    }
+  }
+
+  @Override
+  public long getFileMTime(String filePath) throws FileBasedHelperException {
+    try {
+      Path path = new Path(filePath);
+      FileSystem fs = getFileSystemForPath(path);
+      return fs.getFileStatus(path).getModificationTime();
+    } catch (IOException e) {
+      throw new FileBasedHelperException("Failed to get file modification time 
for: " + filePath, e);
+    }
+  }
+
+  private FileSystem getFileSystemForPath(Path path) throws IOException {
+    // If path has a different scheme than the default FileSystem, get 
scheme-specific FS
+    if (path.toUri().getScheme() != null &&
+      !path.toUri().getScheme().equals(fileSystem.getUri().getScheme())) {
+      return path.getFileSystem(configuration);
+    }
+    return fileSystem;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (fileSystem != null) {
+      try {
+        fileSystem.close();
+        log.info("Closed Iceberg file stream helper and FileSystem 
connection");
+      } catch (IOException e) {
+        log.warn("Error closing FileSystem connection", e);
+        throw e;
+      }
+    } else {
+      log.debug("Closing Iceberg file stream helper - no FileSystem to close");
+    }
+  }
+
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java
new file mode 100644
index 0000000000..10baa7846c
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java
@@ -0,0 +1,604 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.base.Optional;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+import org.apache.gobblin.source.extractor.filebased.FileBasedSource;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitWeighter;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.binpacking.FieldWeighter;
+import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+/**
+ * Unified Iceberg source that supports partition-based data copying from 
Iceberg tables.
+ *
+ * This source reads job configuration, applies date partition filters with 
optional lookback period,
+ * and uses Iceberg's TableScan API to enumerate data files for specific 
partitions. It groups files
+ * into work units for parallel processing.
+ *
+ * <pre>
+ * # Basic configuration
+ * source.class=org.apache.gobblin.data.management.copy.iceberg.IcebergSource
+ * iceberg.database.name=db1
+ * iceberg.table.name=table1
+ * iceberg.catalog.uri=ICEBERG_CATALOG_URI
+ *
+ * # Partition filtering with lookback - Static date (hourly partitions by 
default)
+ * iceberg.filter.enabled=true
+ * iceberg.partition.column=datepartition  # Optional, defaults to 
"datepartition"
+ * iceberg.filter.date=2025-04-01         # Date value (yyyy-MM-dd format)
+ * iceberg.lookback.days=3
+ * # iceberg.hourly.partition.enabled=true  # Optional, defaults to true; 
generates yyyy-MM-dd-00 format
+ *
+ * # Partition filtering with lookback - Scheduled flows (dynamic date)
+ * iceberg.filter.enabled=true
+ * iceberg.filter.date=CURRENT_DATE       # Uses current date at runtime
+ * iceberg.lookback.days=3
+ *
+ * # Standard daily partitions (disable hourly suffix if table uses yyyy-MM-dd 
format)
+ * iceberg.filter.enabled=true
+ * iceberg.filter.date=2025-04-01         # Date in yyyy-MM-dd format
+ * iceberg.hourly.partition.enabled=false # Set false for non-hourly partitions
+ * iceberg.lookback.days=3
+ *
+ * # Copy all data (no filtering)
+ * iceberg.filter.enabled=false
+ * # No filter.date needed - will copy all partitions from current snapshot
+ *
+ * # Bin packing for better resource utilization
+ * gobblin.copy.binPacking.maxSizePerBin=1000000000  # 1GB per bin
+ * </pre>
+ */
+@Slf4j
+public class IcebergSource extends FileBasedSource<String, 
FileAwareInputStream> {
+
+  public static final String ICEBERG_DATABASE_NAME = "iceberg.database.name";
+  public static final String ICEBERG_TABLE_NAME = "iceberg.table.name";
+  public static final String ICEBERG_CATALOG_URI = "iceberg.catalog.uri";
+  public static final String ICEBERG_CATALOG_CLASS = "iceberg.catalog.class";
+  public static final String DEFAULT_ICEBERG_CATALOG_CLASS = 
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
+  public static final String ICEBERG_RECORD_PROCESSING_ENABLED = 
"iceberg.record.processing.enabled";
+  public static final boolean DEFAULT_RECORD_PROCESSING_ENABLED = false;
+  public static final String ICEBERG_FILES_PER_WORKUNIT = 
"iceberg.files.per.workunit";
+  public static final int DEFAULT_FILES_PER_WORKUNIT = 10;
+  public static final String ICEBERG_FILTER_ENABLED = "iceberg.filter.enabled";
+  public static final String ICEBERG_FILTER_DATE = "iceberg.filter.date"; // 
Date value (e.g., 2025-04-01 or CURRENT_DATE)
+  public static final String ICEBERG_LOOKBACK_DAYS = "iceberg.lookback.days";
+  public static final int DEFAULT_LOOKBACK_DAYS = 1;
+  public static final String ICEBERG_PARTITION_COLUMN = 
"iceberg.partition.column"; // configurable partition column name
+  public static final String DEFAULT_DATE_PARTITION_COLUMN = "datepartition"; 
// default date partition column name
+  public static final String CURRENT_DATE_PLACEHOLDER = "CURRENT_DATE"; // 
placeholder for current date
+  public static final String ICEBERG_PARTITION_KEY = "iceberg.partition.key";
+  public static final String ICEBERG_PARTITION_VALUES = 
"iceberg.partition.values";
+  public static final String ICEBERG_FILE_PARTITION_PATH = 
"iceberg.file.partition.path";
+  public static final String ICEBERG_HOURLY_PARTITION_ENABLED = 
"iceberg.hourly.partition.enabled";
+  public static final boolean DEFAULT_HOURLY_PARTITION_ENABLED = true;
+  private static final String HOURLY_PARTITION_SUFFIX = "-00";
+  private static final String WORK_UNIT_WEIGHT = "iceberg.workUnitWeight";
+
+  private Optional<LineageInfo> lineageInfo;
+  private final WorkUnitWeighter weighter = new 
FieldWeighter(WORK_UNIT_WEIGHT);
+
+  /**
+   * Initialize file system helper based on mode (streaming vs record 
processing)
+   */
+  @Override
+  public void initFileSystemHelper(State state) throws 
FileBasedHelperException {
+    // For file streaming mode, we use IcebergFileStreamHelper
+    // For record processing mode, we'll use a different helper (future 
implementation)
+    boolean recordProcessingEnabled = state.getPropAsBoolean(
+      ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED);
+
+    if (recordProcessingEnabled) {
+      // Future: Initialize helper for record processing
+      throw new UnsupportedOperationException("Record processing mode not yet 
implemented. " +
+        "This will be added when SQL/Data Lake destinations are required.");
+    } else {
+      // Initialize helper for file streaming - now implements 
TimestampAwareFileBasedHelper
+      this.fsHelper = new IcebergFileStreamHelper(state);
+      this.fsHelper.connect();
+    }
+  }
+
+  /**
+   * Get work units by discovering files from Iceberg table
+   * @param state is the source state
+   * @return List<WorkUnit> list of work units
+   */
+  @Override
+  public List<WorkUnit> getWorkunits(SourceState state) {
+    this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
+
+    try {
+      initFileSystemHelper(state);
+
+      validateConfiguration(state);
+
+      IcebergCatalog catalog = createCatalog(state);
+      String database = state.getProp(ICEBERG_DATABASE_NAME);
+      String table = state.getProp(ICEBERG_TABLE_NAME);
+      IcebergTable icebergTable = catalog.openTable(database, table);
+
+      List<IcebergTable.FilePathWithPartition> filesWithPartitions = 
discoverPartitionFilePaths(state, icebergTable);
+      log.info("Discovered {} files from Iceberg table {}.{}", 
filesWithPartitions.size(), database, table);
+
+      // Create work units from discovered files
+      List<WorkUnit> workUnits = createWorkUnitsFromFiles(filesWithPartitions, 
state, icebergTable);
+
+      // Handle simulate mode - log what would be copied without executing
+      if (state.contains(CopySource.SIMULATE) && 
state.getPropAsBoolean(CopySource.SIMULATE)) {
+        log.info("Simulate mode enabled. Will not execute the copy.");
+        logSimulateMode(workUnits, filesWithPartitions, state);
+        return Lists.newArrayList();
+      }
+
+      // Apply bin packing to work units if configured
+      List<? extends WorkUnit> packedWorkUnits = applyBinPacking(workUnits, 
state);
+      log.info("Work unit creation complete. Initial work units: {}, packed 
work units: {}",
+        workUnits.size(), packedWorkUnits.size());
+
+      return Lists.newArrayList(packedWorkUnits);
+
+    } catch (Exception e) {
+      log.error("Failed to create work units for Iceberg table", e);
+      throw new RuntimeException("Failed to create work units", e);
+    }
+  }
+
+  /**
+   * Get extractor based on mode (streaming vs record processing)
+   *
+   * @param state a {@link org.apache.gobblin.configuration.WorkUnitState} 
carrying properties needed by the returned {@link Extractor}
+   * @return
+   * @throws IOException
+   */
+  @Override
+  public Extractor<String, FileAwareInputStream> getExtractor(WorkUnitState 
state) throws IOException {
+    boolean recordProcessingEnabled = state.getPropAsBoolean(
+      ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED);
+
+    if (recordProcessingEnabled) {
+      // Return record processing extractor
+      throw new UnsupportedOperationException("Record processing mode not yet 
implemented.");
+    } else {
+      // Return file streaming extractor
+      return new IcebergFileStreamExtractor(state);
+    }
+  }
+
+  /**
+   * Discover partition data files using Iceberg TableScan API with optional 
lookback for date partitions.
+   *
+   * <p>This method supports three modes:
+   * <ol>
+   * <li><b>Full table scan (copy all data)</b>: Set {@code 
iceberg.filter.enabled=false}.
+   * Returns all data files from current snapshot. Use this for one-time full 
copies or backfills.</li>
+   * <li><b>Static date partition filter</b>: Set {@code 
iceberg.filter.enabled=true} with a specific date
+   * (e.g., {@code iceberg.filter.date=2025-04-01}). Use this for ad-hoc 
historical data copies.</li>
+   * <li><b>Dynamic date partition filter</b>: Set {@code 
iceberg.filter.enabled=true} with
+   * {@code iceberg.filter.date=CURRENT_DATE}. The {@value 
#CURRENT_DATE_PLACEHOLDER} placeholder
+   * is resolved to the current date at runtime. Use this for daily scheduled 
flows.</li>
+   * </ol>
+   *
+   * <p>The partition column name is configurable via {@code 
iceberg.partition.column}
+   * (defaults to {@value #DEFAULT_DATE_PARTITION_COLUMN}). The date value is 
specified separately via
+   * {@code iceberg.filter.date} in standard format ({@code yyyy-MM-dd}).
+   *
+   * <p><b>Hourly Partition Support:</b> By default, the {@code -00} suffix is 
appended to all partition values
+   * for tables using hourly partition format ({@code yyyy-MM-dd-HH}). For 
tables using standard daily partition
+   * format ({@code yyyy-MM-dd}), set {@code 
iceberg.hourly.partition.enabled=false}. Users should always provide
+   * dates in standard {@code yyyy-MM-dd} format; the hour suffix is 
automatically managed.
+   *
+   * <p><b>Configuration Examples:</b>
+   * <ul>
+   * <li>Static: {@code iceberg.partition.column=datepartition, 
iceberg.filter.date=2025-04-03, iceberg.lookback.days=3}
+   * discovers: datepartition=2025-04-03, 2025-04-02, 2025-04-01</li>
+   * <li>Dynamic: {@code iceberg.filter.date=CURRENT_DATE, 
iceberg.lookback.days=1}
+   * discovers today's partition only (resolved at runtime)</li>
+   * </ul>
+   *
+   * @param state source state containing filter configuration
+   * @param icebergTable the Iceberg table to scan
+   * @return list of file paths with partition metadata matching the filter 
criteria
+   * @throws IOException if table scan or file discovery fails
+   */
+  private List<IcebergTable.FilePathWithPartition> 
discoverPartitionFilePaths(SourceState state, IcebergTable icebergTable) throws 
IOException {
+    boolean filterEnabled = state.getPropAsBoolean(ICEBERG_FILTER_ENABLED, 
true);
+
+    if (!filterEnabled) {
+      log.info("Partition filter disabled, discovering all data files with 
partition metadata from current snapshot");
+      // Use TableScan without filter to get all files with partition metadata 
preserved
+      // This ensures partition structure is maintained even for full table 
copies
+      List<IcebergTable.FilePathWithPartition> result = 
icebergTable.getFilePathsWithPartitionsForFilter(Expressions.alwaysTrue());
+      log.info("Discovered {} data files from current snapshot with partition 
metadata", result.size());
+      return result;
+    }
+
+    String datePartitionColumn = state.getProp(ICEBERG_PARTITION_COLUMN, 
DEFAULT_DATE_PARTITION_COLUMN);
+        
+    String dateValue = state.getProp(ICEBERG_FILTER_DATE);
+    Preconditions.checkArgument(!StringUtils.isBlank(dateValue),
+      "iceberg.filter.date is required when iceberg.filter.enabled=true");
+
+    // Handle CURRENT_DATE placeholder for flows
+    if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
+      dateValue = LocalDate.now().toString();
+      log.info("Resolved {} placeholder to current date: {}", 
CURRENT_DATE_PLACEHOLDER, dateValue);
+    }
+
+    // Apply lookback period for date partitions
+    // lookbackDays=1 (default) means copy only the specified date
+    // lookbackDays=3 means copy specified date + 2 previous days (total 3 
days)
+    int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, 
DEFAULT_LOOKBACK_DAYS);
+    List<String> values = Lists.newArrayList();
+
+    if (lookbackDays >= 1) {
+      log.info("Applying lookback period of {} days for date partition column 
'{}': {}", lookbackDays, datePartitionColumn, dateValue);
+      
+      // Check if hourly partitioning is enabled
+      boolean isHourlyPartition = 
state.getPropAsBoolean(ICEBERG_HOURLY_PARTITION_ENABLED, 
DEFAULT_HOURLY_PARTITION_ENABLED);
+      
+      // Parse the date in yyyy-MM-dd format
+      LocalDate start;
+      try {
+        start = LocalDate.parse(dateValue);
+      } catch (java.time.format.DateTimeParseException e) {
+        String errorMsg = String.format(
+          "Invalid date format for '%s': '%s'. Expected format: yyyy-MM-dd. 
Error: %s",
+          ICEBERG_FILTER_DATE, dateValue, e.getMessage());
+        log.error(errorMsg);
+        throw new IllegalArgumentException(errorMsg, e);
+      }
+      
+      for (int i = 0; i < lookbackDays; i++) {
+        String dateOnly = start.minusDays(i).toString();
+        // Append hour suffix if hourly partitioning is enabled
+        String partitionValue = isHourlyPartition ? dateOnly + 
HOURLY_PARTITION_SUFFIX : dateOnly;
+        values.add(partitionValue);
+        log.info("Including partition: {}={}", datePartitionColumn, 
partitionValue);
+      }
+    } else {
+      log.error("lookbackDays < 1, cannot apply lookback. lookbackDays={}", 
lookbackDays);
+      throw new IllegalArgumentException(String.format(
+        "lookback.days must be >= 1, got: %d", lookbackDays));
+    }
+
+    // Store partition info on state for downstream use (extractor, 
destination path mapping)
+    state.setProp(ICEBERG_PARTITION_KEY, datePartitionColumn);
+    state.setProp(ICEBERG_PARTITION_VALUES, String.join(",", values));
+
+    // Use Iceberg TableScan API to get only data files (parquet/orc/avro) for 
specified partitions
+    // TableScan.planFiles() returns DataFiles only - no manifest files or 
metadata files
+    log.info("Executing TableScan with filter: {}={}", datePartitionColumn, 
values);
+    Expression icebergExpr = null;
+    for (String val : values) {
+      Expression e = Expressions.equal(datePartitionColumn, val);
+      icebergExpr = (icebergExpr == null) ? e : Expressions.or(icebergExpr, e);
+    }
+
+    List<IcebergTable.FilePathWithPartition> filesWithPartitions = 
icebergTable.getFilePathsWithPartitionsForFilter(icebergExpr);
+    log.info("Discovered {} data files for partitions: {}", 
filesWithPartitions.size(), values);
+
+    return filesWithPartitions;
+  }
+
+  /**
+   * Create work units from discovered file paths by grouping them for 
parallel processing.
+   *
+   * Files are grouped into work units based on {@code 
iceberg.files.per.workunit} configuration.
+   * Each work unit contains metadata about the files to process.
+   *
+   * @param filesWithPartitions list of file paths with partition metadata to 
process
+   * @param state source state containing job configuration
+   * @param table the Iceberg table being copied
+   * @return list of work units ready for parallel execution
+   */
+  private List<WorkUnit> 
createWorkUnitsFromFiles(List<IcebergTable.FilePathWithPartition> 
filesWithPartitions, SourceState state, IcebergTable table) {
+    List<WorkUnit> workUnits = Lists.newArrayList();
+
+    if (filesWithPartitions.isEmpty()) {
+      log.warn("No files discovered for table {}.{}, returning empty work unit 
list",
+        state.getProp(ICEBERG_DATABASE_NAME), 
state.getProp(ICEBERG_TABLE_NAME));
+      return workUnits;
+    }
+
+    String nameSpace = 
state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, "iceberg");
+    String tableName = table.getTableId().name();
+    Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, nameSpace, 
tableName);
+
+    int filesPerWorkUnit = state.getPropAsInt(ICEBERG_FILES_PER_WORKUNIT, 
DEFAULT_FILES_PER_WORKUNIT);
+    List<List<IcebergTable.FilePathWithPartition>> groups = 
Lists.partition(filesWithPartitions, Math.max(1, filesPerWorkUnit));
+    log.info("Grouping {} files into {} work units ({} files per work unit)",
+      filesWithPartitions.size(), groups.size(), filesPerWorkUnit);
+
+    for (int i = 0; i < groups.size(); i++) {
+      List<IcebergTable.FilePathWithPartition> group = groups.get(i);
+      WorkUnit workUnit = new WorkUnit(extract);
+
+      // Store data file paths and their partition metadata separately
+      // Note: Only data files (parquet/orc/avro) are included, no Iceberg 
metadata files
+      List<String> filePaths = Lists.newArrayList();
+      Map<String, String> fileToPartitionPath = Maps.newHashMap();
+      long totalSize = 0L;
+
+      for (IcebergTable.FilePathWithPartition fileWithPartition : group) {
+        String filePath = fileWithPartition.getFilePath();
+        filePaths.add(filePath);
+        // Store partition path for each file
+        fileToPartitionPath.put(filePath, 
fileWithPartition.getPartitionPath());
+        // Accumulate file sizes for work unit weight
+        totalSize += fileWithPartition.getFileSize();
+      }
+
+      workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, 
String.join(",", filePaths));
+
+      // Store partition path mapping as JSON for extractor to use
+      workUnit.setProp(ICEBERG_FILE_PARTITION_PATH, new 
com.google.gson.Gson().toJson(fileToPartitionPath));
+
+      // Set work unit size for dynamic scaling (instead of just file count)
+      workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, totalSize);
+
+      // Set work unit weight for bin packing
+      setWorkUnitWeight(workUnit, totalSize);
+
+      // Carry partition info to extractor for destination path mapping
+      if (state.contains(ICEBERG_PARTITION_KEY)) {
+        workUnit.setProp(ICEBERG_PARTITION_KEY, 
state.getProp(ICEBERG_PARTITION_KEY));
+      }
+      if (state.contains(ICEBERG_PARTITION_VALUES)) {
+        workUnit.setProp(ICEBERG_PARTITION_VALUES, 
state.getProp(ICEBERG_PARTITION_VALUES));
+      }
+
+      // Add lineage information for data governance and tracking
+      addLineageSourceInfo(state, workUnit, table);
+      workUnits.add(workUnit);
+
+      log.info("Created work unit {} with {} files, total size: {} bytes", i, 
group.size(), totalSize);
+    }
+
+    return workUnits;
+  }
+
+  /**
+   * Create catalog using existing IcebergDatasetFinder logic
+   */
+  private IcebergCatalog createCatalog(SourceState state) throws IOException {
+    String catalogPrefix = "iceberg.catalog.";
+    Map<String, String> catalogProperties = 
buildMapFromPrefixChildren(state.getProperties(), catalogPrefix);
+
+    Configuration configuration = 
HadoopUtils.getConfFromProperties(state.getProperties());
+    String catalogClassName = catalogProperties.getOrDefault("class", 
DEFAULT_ICEBERG_CATALOG_CLASS);
+
+    return IcebergCatalogFactory.create(catalogClassName, catalogProperties, 
configuration);
+  }
+
+  /**
+   * Build map of properties with given prefix
+   *
+   */
+  private Map<String, String> buildMapFromPrefixChildren(Properties 
properties, String configPrefix) {
+    Map<String, String> catalogProperties = Maps.newHashMap();
+
+    for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+      String key = (String) entry.getKey();
+      if (key.startsWith(configPrefix)) {
+        String relativeKey = key.substring(configPrefix.length());
+        catalogProperties.put(relativeKey, (String) entry.getValue());
+      }
+    }
+
+    String catalogUri = catalogProperties.get("uri");
+    Preconditions.checkNotNull(catalogUri, "Catalog URI is required");
+
+    return catalogProperties;
+  }
+
+  /**
+   * Add lineage information
+   */
+  private void addLineageSourceInfo(SourceState sourceState, WorkUnit 
workUnit, IcebergTable table) {
+    if (this.lineageInfo != null && this.lineageInfo.isPresent()) {
+      String catalogUri = sourceState.getProp(ICEBERG_CATALOG_URI);
+      String database = sourceState.getProp(ICEBERG_DATABASE_NAME);
+      String tableName = sourceState.getProp(ICEBERG_TABLE_NAME);
+
+      DatasetDescriptor source = new DatasetDescriptor(
+        DatasetConstants.PLATFORM_ICEBERG,
+        URI.create(catalogUri),
+        database + "." + tableName
+      );
+
+      source.addMetadata("catalog.uri", catalogUri);
+      source.addMetadata("table.location", getTableLocation(table));
+
+      this.lineageInfo.get().setSource(source, workUnit);
+    }
+  }
+
+  /**
+   * Get table location from Iceberg table metadata
+   * @param table the Iceberg table
+   * @return table location or "unknown" if not available
+   */
+  private String getTableLocation(IcebergTable table) {
+    try {
+      return table.accessTableMetadata().location();
+    } catch (Exception e) {
+      return "unknown";
+    }
+  }
+
+  /**
+   * Validate required configuration properties
+   */
+  private void validateConfiguration(SourceState state) {
+    String database = state.getProp(ICEBERG_DATABASE_NAME);
+    String table = state.getProp(ICEBERG_TABLE_NAME);
+    String catalogUri = state.getProp(ICEBERG_CATALOG_URI);
+
+    if (StringUtils.isBlank(database)) {
+      throw new IllegalArgumentException("iceberg.database.name is required");
+    }
+    if (StringUtils.isBlank(table)) {
+      throw new IllegalArgumentException("iceberg.table.name is required");
+    }
+    if (StringUtils.isBlank(catalogUri)) {
+      throw new IllegalArgumentException("iceberg.catalog.uri is required");
+    }
+  }
+
+  /**
+   * Set work unit weight for bin packing based on total file size.
+   * Ensures a minimum weight to prevent skew in bin packing.
+   *
+   * @param workUnit the work unit to set weight on
+   * @param totalSize total size of files in bytes
+   */
+  private void setWorkUnitWeight(WorkUnit workUnit, long totalSize) {
+    long weight = Math.max(totalSize, 1L);
+    workUnit.setProp(WORK_UNIT_WEIGHT, Long.toString(weight));
+  }
+
+  /**
+   * Apply bin packing to work units if configured.
+   * Groups work units into bins based on size constraints for better resource 
utilization.
+   *
+   * @param workUnits initial list of work units
+   * @param state source state containing bin packing configuration
+   * @return packed work units (or original if bin packing not configured)
+   */
+  private List<? extends WorkUnit> applyBinPacking(List<WorkUnit> workUnits, 
SourceState state) {
+    long maxSizePerBin = 
state.getPropAsLong(CopySource.MAX_SIZE_MULTI_WORKUNITS, 0);
+
+    if (maxSizePerBin <= 0) {
+      log.info("Bin packing disabled (maxSizePerBin={}), returning original 
work units", maxSizePerBin);
+      return workUnits;
+    }
+
+    log.info("Applying bin packing with maxSizePerBin={} bytes", 
maxSizePerBin);
+
+    List<? extends WorkUnit> packedWorkUnits = new 
WorstFitDecreasingBinPacking(maxSizePerBin)
+      .pack(workUnits, this.weighter);
+
+    log.info("Bin packing complete. Initial work units: {}, packed work units: 
{}, max size per bin: {} bytes",
+      workUnits.size(), packedWorkUnits.size(), maxSizePerBin);
+
+    return packedWorkUnits;
+  }
+
+  /**
+   * Log simulate mode information - what would be copied without executing.
+   * Provides detailed information about files, partitions, and sizes for 
dry-run validation.
+   *
+   * @param workUnits work units that would be executed
+   * @param filesWithPartitions discovered files with partition metadata
+   * @param state source state containing job configuration
+   */
+  private void logSimulateMode(List<WorkUnit> workUnits,
+                                  List<IcebergTable.FilePathWithPartition> 
filesWithPartitions,
+                                  SourceState state) {
+    String database = state.getProp(ICEBERG_DATABASE_NAME);
+    String table = state.getProp(ICEBERG_TABLE_NAME);
+
+    String separator = StringUtils.repeat("=", 80);
+    String dashSeparator = StringUtils.repeat("-", 80);
+
+    log.info(separator);
+    log.info("SIMULATE MODE: Iceberg Table Copy Plan");
+    log.info(separator);
+    log.info("Source Table: {}.{}", database, table);
+    log.info("Total Files Discovered: {}", filesWithPartitions.size());
+    log.info("Total Work Units: {}", workUnits.size());
+
+    // Calculate total size
+    long totalSize = 0L;
+    Map<String, Long> partitionSizes = Maps.newLinkedHashMap();
+
+    for (IcebergTable.FilePathWithPartition fileWithPartition : 
filesWithPartitions) {
+      long fileSize = fileWithPartition.getFileSize();
+      totalSize += fileSize;
+
+      String partitionPath = fileWithPartition.getPartitionPath();
+      if (!partitionPath.isEmpty()) {
+        partitionSizes.put(partitionPath, 
partitionSizes.getOrDefault(partitionPath, 0L) + fileSize);
+      }
+    }
+
+    log.info("Total Data Size: {} bytes ({} MB)", totalSize, totalSize / (1024 
* 1024));
+
+    if (!partitionSizes.isEmpty()) {
+      log.info(dashSeparator);
+      log.info("Partition Breakdown:");
+      for (Map.Entry<String, Long> entry : partitionSizes.entrySet()) {
+        long sizeInMB = entry.getValue() / (1024 * 1024);
+        log.info("  Partition: {} -> {} bytes ({} MB)", entry.getKey(), 
entry.getValue(), sizeInMB);
+      }
+    }
+
+    log.info(dashSeparator);
+    log.info("Work Unit Distribution:");
+    for (int i = 0; i < Math.min(workUnits.size(), 10); i++) {
+      WorkUnit wu = workUnits.get(i);
+      String filesToPull = 
wu.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, "");
+      int fileCount = filesToPull.isEmpty() ? 0 : 
filesToPull.split(",").length;
+      long wuSize = wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0L);
+      log.info("  WorkUnit[{}]: {} files, {} bytes ({} MB)", i, fileCount, 
wuSize, wuSize / (1024 * 1024));
+    }
+
+    if (workUnits.size() > 10) {
+      log.info("  ... and {} more work units", workUnits.size() - 10);
+    }
+
+    log.info(separator);
+    log.info("Simulate mode: No data will be copied. Set 
iceberg.simulate=false to execute.");
+    log.info(separator);
+  }
+
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index 0f6c371e0e..a899f83c8c 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -23,6 +23,7 @@ import java.time.Instant;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Predicate;
@@ -31,18 +32,23 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.ManifestContent;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestReader;
 import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableScan;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
@@ -52,6 +58,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import lombok.AllArgsConstructor;
@@ -344,4 +351,122 @@ public class IcebergTable {
     }
   }
 
+  /**
+   * Container for file path, partition information, and file size.
+   */
+  public static class FilePathWithPartition {
+    private final String filePath;
+    private final Map<String, String> partitionData;
+    private final long fileSize;
+    
+    /**
+     * Creates a file path with partition and size information.
+     * 
+     * @param filePath the absolute path to the data file
+     * @param partitionData map of partition key-value pairs (e.g., 
"datepartition" -> "2025-04-01")
+     * @param fileSize the size of the file in bytes
+     */
+    public FilePathWithPartition(String filePath, Map<String, String> 
partitionData, long fileSize) {
+      this.filePath = filePath;
+      this.partitionData = partitionData;
+      this.fileSize = fileSize;
+    }
+    
+    public String getFilePath() {
+      return filePath;
+    }
+    
+    public Map<String, String> getPartitionData() {
+      return partitionData;
+    }
+    
+    public long getFileSize() {
+      return fileSize;
+    }
+    
+    public String getPartitionPath() {
+      if (partitionData == null || partitionData.isEmpty()) {
+        return "";
+      }
+      StringBuilder sb = new StringBuilder();
+      for (Map.Entry<String, String> entry : partitionData.entrySet()) {
+        if (sb.length() > 0) {
+          sb.append("/");
+        }
+        sb.append(entry.getKey()).append("=").append(entry.getValue());
+      }
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Return absolute data file paths for files that match the provided Iceberg 
filter expression using TableScan.
+   */
+  public List<String> 
getDataFilePathsForFilter(org.apache.iceberg.expressions.Expression 
filterExpression) {
+    List<String> result = Lists.newArrayList();
+    org.apache.iceberg.TableScan scan = 
this.table.newScan().filter(filterExpression);
+    try (CloseableIterable<org.apache.iceberg.FileScanTask> tasks = 
scan.planFiles()) {
+      for (org.apache.iceberg.FileScanTask task : tasks) {
+        result.add(task.file().path().toString());
+      }
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to plan files for filter: " + 
filterExpression, ioe);
+    }
+    return result;
+  }
+  
+  /**
+   * Return file paths with partition information and file size for files 
matching the filter expression.
+   * This method extracts partition values and file size from Iceberg metadata 
and associates them with file paths.
+   */
+  public List<FilePathWithPartition> getFilePathsWithPartitionsForFilter(
+      Expression filterExpression) {
+    List<FilePathWithPartition> result = Lists.newArrayList();
+    TableScan scan = this.table.newScan().filter(filterExpression);
+    PartitionSpec spec = this.table.spec();
+    
+    try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+      for (FileScanTask task : tasks) {
+        String filePath = task.file().path().toString();
+        long fileSize = task.file().fileSizeInBytes();
+        
+        // Extract partition data from the file's partition information
+        Map<String, String> partitionData = Maps.newLinkedHashMap();
+        if (task.file().partition() != null && !spec.isUnpartitioned()) {
+          StructLike partition = task.file().partition();
+          List<PartitionField> fields = spec.fields();
+          
+          for (int i = 0; i < fields.size(); i++) {
+            PartitionField field = fields.get(i);
+            String partitionName = field.name();
+            Object partitionValue = partition.get(i, Object.class);
+            if (partitionValue != null) {
+              partitionData.put(partitionName, partitionValue.toString());
+            }
+          }
+        }
+        
+        result.add(new FilePathWithPartition(filePath, partitionData, 
fileSize));
+      }
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to plan files for filter: " + 
filterExpression, ioe);
+    }
+    return result;
+  }
+
+  /**
+   * Return data file paths for files that match any of the specified 
partition values for a given partition field.
+   */
+  public List<String> getDataFilePathsForPartitionValues(String 
partitionField, List<String> partitionValues) {
+    if (partitionValues == null || partitionValues.isEmpty()) {
+      return Lists.newArrayList();
+    }
+    org.apache.iceberg.expressions.Expression expr = null;
+    for (String val : partitionValues) {
+      org.apache.iceberg.expressions.Expression e = 
org.apache.iceberg.expressions.Expressions.equal(partitionField, val);
+      expr = (expr == null) ? e : 
org.apache.iceberg.expressions.Expressions.or(expr, e);
+    }
+    return getDataFilePathsForFilter(expr);
+  }
+
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelperTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelperTest.java
new file mode 100644
index 0000000000..30e195449d
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelperTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+
+/**
+ * Tests for {@link IcebergFileStreamHelper}.
+ */
+public class IcebergFileStreamHelperTest {
+
+  private File tempDir;
+  private File testFile1;
+  private File testFile2;
+  private IcebergFileStreamHelper helper;
+  private State state;
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    // Create temp directory and test files
+    tempDir = new File(System.getProperty("java.io.tmpdir"), 
"iceberg-helper-test-" + System.currentTimeMillis());
+    tempDir.mkdirs();
+
+    testFile1 = new File(tempDir, "data-file-1.parquet");
+    try (FileOutputStream fos = new FileOutputStream(testFile1)) {
+      fos.write("Test data for file 1".getBytes());
+    }
+
+    testFile2 = new File(tempDir, "data-file-2.parquet");
+    try (FileOutputStream fos = new FileOutputStream(testFile2)) {
+      fos.write("Test data for file 2 with more content".getBytes());
+    }
+
+    // Set up state with file paths
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
+      testFile1.getAbsolutePath() + "," + testFile2.getAbsolutePath());
+    state = new State(properties);
+
+    // Initialize helper
+    helper = new IcebergFileStreamHelper(state);
+  }
+
+  @AfterMethod
+  public void tearDown() throws Exception {
+    if (helper != null) {
+      helper.close();
+    }
+    // Clean up temp files
+    if (tempDir != null && tempDir.exists()) {
+      deleteDirectory(tempDir);
+    }
+  }
+
+  @Test
+  public void testConnect() throws Exception {
+    helper.connect();
+    // If no exception thrown, connection succeeded
+    Assert.assertTrue(true, "Connection should succeed");
+  }
+
+  @Test
+  public void testListFiles() throws Exception {
+    helper.connect();
+    List<String> files = helper.ls("");
+
+    Assert.assertNotNull(files, "File list should not be null");
+    Assert.assertEquals(files.size(), 2, "Should return 2 files from 
configuration");
+    Assert.assertTrue(files.contains(testFile1.getAbsolutePath()),
+      "Should contain first test file");
+    Assert.assertTrue(files.contains(testFile2.getAbsolutePath()),
+      "Should contain second test file");
+  }
+
+  @Test
+  public void testListFilesWithEmptyConfig() throws Exception {
+    State emptyState = new State();
+    emptyState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, "");
+    IcebergFileStreamHelper emptyHelper = new 
IcebergFileStreamHelper(emptyState);
+
+    try {
+      emptyHelper.connect();
+      List<String> files = emptyHelper.ls("");
+      Assert.assertTrue(files.isEmpty(), "Should return empty list for empty 
configuration");
+    } finally {
+      emptyHelper.close();
+    }
+  }
+
+  @Test
+  public void testGetFileStream() throws Exception {
+    helper.connect();
+
+    // Test getting file stream
+    InputStream is = helper.getFileStream(testFile1.getAbsolutePath());
+    Assert.assertNotNull(is, "File stream should not be null");
+
+    // Verify we can read from stream
+    byte[] buffer = new byte[1024];
+    int bytesRead = is.read(buffer);
+    Assert.assertTrue(bytesRead > 0, "Should be able to read bytes from 
stream");
+
+    String content = new String(buffer, 0, bytesRead);
+    Assert.assertEquals(content, "Test data for file 1",
+      "Stream content should match file content");
+
+    is.close();
+  }
+
+  @Test(expectedExceptions = FileBasedHelperException.class)
+  public void testGetFileStreamForNonExistentFile() throws Exception {
+    helper.connect();
+
+    // Test error handling for non-existent file
+    helper.getFileStream("/non/existent/path/file.parquet");
+  }
+
+  @Test
+  public void testGetFileSize() throws Exception {
+    helper.connect();
+
+    // Test getting file size
+    long size1 = helper.getFileSize(testFile1.getAbsolutePath());
+    long size2 = helper.getFileSize(testFile2.getAbsolutePath());
+
+    Assert.assertEquals(size1, "Test data for file 1".getBytes().length,
+      "File size should match actual file size");
+    Assert.assertEquals(size2, "Test data for file 2 with more 
content".getBytes().length,
+      "File size should match actual file size");
+    Assert.assertTrue(size2 > size1, "Second file should be larger than 
first");
+  }
+
+  @Test(expectedExceptions = FileBasedHelperException.class)
+  public void testGetFileSizeForNonExistentFile() throws Exception {
+    helper.connect();
+
+    // Test error handling for non-existent file
+    helper.getFileSize("/non/existent/path/file.parquet");
+  }
+
+  @Test
+  public void testGetFileMTime() throws Exception {
+    helper.connect();
+
+    // Test getting file modification time
+    long mtime1 = helper.getFileMTime(testFile1.getAbsolutePath());
+    long mtime2 = helper.getFileMTime(testFile2.getAbsolutePath());
+
+    Assert.assertTrue(mtime1 > 0, "Modification time should be positive");
+    Assert.assertTrue(mtime2 > 0, "Modification time should be positive");
+
+    // mtime2 should be >= mtime1 since it was created after (or same 
millisecond)
+    Assert.assertTrue(mtime2 >= mtime1,
+      "Second file's mtime should be >= first file's mtime");
+  }
+
+  @Test(expectedExceptions = FileBasedHelperException.class)
+  public void testGetFileMTimeForNonExistentFile() throws Exception {
+    helper.connect();
+
+    // Test error handling for non-existent file
+    helper.getFileMTime("/non/existent/path/file.parquet");
+  }
+
+
+  @Test
+  public void testHadoopConfigurationProperties() throws Exception {
+    // Test that Hadoop configuration properties are properly propagated
+    State stateWithHadoopProps = new State();
+    
stateWithHadoopProps.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
+      testFile1.getAbsolutePath());
+
+    // Add custom Hadoop properties
+    String testFsProperty = "fs.custom.impl";
+    String testFsValue = "org.example.CustomFileSystem";
+    String testHadoopProperty = "hadoop.custom.setting";
+    String testHadoopValue = "customValue";
+
+    stateWithHadoopProps.setProp(testFsProperty, testFsValue);
+    stateWithHadoopProps.setProp(testHadoopProperty, testHadoopValue);
+    stateWithHadoopProps.setProp("not.hadoop.property", "shouldNotBeInConfig");
+
+    IcebergFileStreamHelper helperWithProps = new 
IcebergFileStreamHelper(stateWithHadoopProps);
+
+    try {
+      helperWithProps.connect();
+
+      // Verify the properties were set in the Hadoop Configuration via 
reflection
+      java.lang.reflect.Field configField = 
IcebergFileStreamHelper.class.getDeclaredField("configuration");
+      configField.setAccessible(true);
+      org.apache.hadoop.conf.Configuration config =
+        (org.apache.hadoop.conf.Configuration) 
configField.get(helperWithProps);
+
+      Assert.assertEquals(config.get(testFsProperty), testFsValue,
+        "fs.* property should be propagated to Hadoop configuration");
+      Assert.assertEquals(config.get(testHadoopProperty), testHadoopValue,
+        "hadoop.* property should be propagated to Hadoop configuration");
+      Assert.assertNull(config.get("not.hadoop.property"),
+        "Non-Hadoop properties should not be propagated");
+    } finally {
+      helperWithProps.close();
+    }
+  }
+
+  @Test
+  public void testClose() throws Exception {
+    helper.connect();
+
+    // Open a stream
+    InputStream is = helper.getFileStream(testFile1.getAbsolutePath());
+    Assert.assertNotNull(is);
+    is.close();
+
+    // Close helper
+    helper.close();
+
+    // After close, operations should fail
+    try {
+      helper.getFileStream(testFile1.getAbsolutePath());
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof FileBasedHelperException || e instanceof 
IOException,
+        "Should throw appropriate exception after close");
+    }
+  }
+
+
+
+  @Test
+  public void testEmptyFile() throws Exception {
+    // Create empty file
+    File emptyFile = new File(tempDir, "empty.parquet");
+    emptyFile.createNewFile();
+
+    helper.connect();
+
+    // Test edge case: empty file should have size 0
+    long size = helper.getFileSize(emptyFile.getAbsolutePath());
+    Assert.assertEquals(size, 0, "Empty file should have size 0");
+  }
+
+  @Test
+  public void testCrossSchemeFileAccess() throws Exception {
+    helper.connect();
+
+    // Test that getFileSystemForPath correctly handles different schemes
+    // Test 1: Local file path (no scheme)
+    String localPath = testFile1.getAbsolutePath();
+    InputStream localStream = helper.getFileStream(localPath);
+    Assert.assertNotNull(localStream, "Should open stream for local file 
path");
+    localStream.close();
+
+    // Test 2: file:// scheme
+    String fileScheme = "file://" + testFile1.getAbsolutePath();
+    InputStream fileStream = helper.getFileStream(fileScheme);
+    Assert.assertNotNull(fileStream, "Should open stream for file:// scheme");
+    fileStream.close();
+  }
+
+  private void deleteDirectory(File directory) {
+    File[] files = directory.listFiles();
+    if (files != null) {
+      for (File file : files) {
+        if (file.isDirectory()) {
+          deleteDirectory(file);
+        } else {
+          file.delete();
+        }
+      }
+    }
+    directory.delete();
+  }
+
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
new file mode 100644
index 0000000000..b4e6bc7eae
--- /dev/null
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
@@ -0,0 +1,1116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import 
org.apache.gobblin.data.management.copy.iceberg.IcebergTable.FilePathWithPartition;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hive.HiveMetastoreTest;
+import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Unit tests for {@link IcebergSource}.
+ */
+public class IcebergSourceTest {
+
+  @Mock
+  private IcebergCatalog mockCatalog;
+
+  @Mock
+  private IcebergTable mockTable;
+
+  @Mock
+  private IcebergSnapshotInfo mockSnapshot;
+
+  private IcebergSource icebergSource;
+  private SourceState sourceState;
+  private Properties properties;
+
+  private AutoCloseable mocks;
+
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    mocks = MockitoAnnotations.openMocks(this);
+
+    // Initialize IcebergSource
+    this.icebergSource = new IcebergSource();
+
+    // Set up basic properties
+    this.properties = new Properties();
+    properties.setProperty(IcebergSource.ICEBERG_DATABASE_NAME, "test_db");
+    properties.setProperty(IcebergSource.ICEBERG_TABLE_NAME, "test_table");
+    properties.setProperty(IcebergSource.ICEBERG_CATALOG_URI, 
"https://<test>.com/api/v1");
+    properties.setProperty(IcebergSource.ICEBERG_CATALOG_CLASS, 
"org.apache.gobblin.data.management.copy.iceberg.TestCatalog");
+
+    // Create SourceState
+    this.sourceState = new SourceState(new State(properties));
+    // Set a default top-level broker required by LineageInfo
+    com.typesafe.config.Config emptyConfig = 
com.typesafe.config.ConfigFactory.empty();
+    this.sourceState.setBroker(
+      SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
+        emptyConfig,
+        GobblinScopeTypes.GLOBAL.defaultScopeInstance()));
+  }
+
+  @AfterMethod
+  public void tearDown() throws Exception {
+    if (mocks != null) {
+      mocks.close();
+    }
+  }
+
+  @Test
+  public void testFileStreamingModeWorkUnitCreation() throws Exception {
+    // Set up file streaming mode
+    properties.setProperty(IcebergSource.ICEBERG_RECORD_PROCESSING_ENABLED, 
"false");
+
+    // Mock file discovery via snapshot fallback (no filter)
+    List<String> inputPaths = Arrays.asList(
+      "/data/warehouse/test_table/data/file1.parquet",
+      "/data/warehouse/test_table/data/file2.parquet",
+      "/data/warehouse/test_table/data/file3.parquet",
+      "/data/warehouse/test_table/metadata/manifest-list.avro",
+      "/data/warehouse/test_table/metadata/manifest1.avro"
+    );
+    setupMockFileDiscovery(inputPaths);
+
+    // Discover data-only paths via snapshot manifest info
+    List<String> dataFilePaths = inputPaths.stream()
+      .filter(p -> p.endsWith(".parquet") || p.endsWith(".orc") || 
p.endsWith(".avro"))
+      .filter(p -> !p.contains("manifest"))
+      .collect(Collectors.toList());
+
+    // Convert to FilePathWithPartition (no partition info for snapshot-based 
discovery)
+    List<FilePathWithPartition> filesWithPartitions =
+      dataFilePaths.stream()
+        .map(path -> new FilePathWithPartition(
+          path, new java.util.HashMap<>(), 0L))
+        .collect(Collectors.toList());
+
+    // Invoke private createWorkUnitsFromFiles via reflection
+    Method m = 
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class, 
SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource, 
filesWithPartitions, sourceState, mockTable);
+
+    // Verify single work unit contains all 3 data files by default 
(filesPerWorkUnit default=10)
+    Assert.assertEquals(workUnits.size(), 1, "Should create 1 work unit");
+    WorkUnit wu = workUnits.get(0);
+    String filesToPull = 
wu.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL);
+    Assert.assertNotNull(filesToPull);
+    Assert.assertEquals(filesToPull.split(",").length, 3);
+
+    // Verify extract info
+    Assert.assertEquals(wu.getExtract().getNamespace(), "iceberg");
+    Assert.assertEquals(wu.getExtract().getTable(), "test_table");
+    Assert.assertEquals(wu.getExtract().getType(), 
Extract.TableType.SNAPSHOT_ONLY);
+  }
+
+  @Test
+  public void testFileStreamingModeExtractorSelection() throws Exception {
+    // Set up file streaming mode
+    WorkUnit dummyWu = WorkUnit.createEmpty();
+    State jobState = new State(properties);
+    WorkUnitState workUnitState = new WorkUnitState(dummyWu, jobState);
+    workUnitState.setProp(IcebergSource.ICEBERG_RECORD_PROCESSING_ENABLED, 
"false");
+
+    Extractor<String, FileAwareInputStream> extractor = 
icebergSource.getExtractor(workUnitState);
+    // Verify correct extractor type
+    Assert.assertTrue(extractor instanceof IcebergFileStreamExtractor,
+      "File streaming mode should return IcebergFileStreamExtractor");
+  }
+
+  @Test
+  public void testRecordProcessingExtractorThrows() throws Exception {
+    // Set up record processing mode
+    WorkUnit dummyWu = WorkUnit.createEmpty();
+    State jobState = new State(properties);
+    WorkUnitState workUnitState = new WorkUnitState(dummyWu, jobState);
+    workUnitState.setProp(IcebergSource.ICEBERG_RECORD_PROCESSING_ENABLED, 
"true");
+
+    try {
+      icebergSource.getExtractor(workUnitState);
+      Assert.fail("Expected UnsupportedOperationException for record 
processing mode");
+    } catch (UnsupportedOperationException expected) {
+      // Expected exception
+    }
+  }
+
+  @Test
+  public void testConfigurationValidation() throws Exception {
+    // Test missing database name via direct validateConfiguration method
+    properties.remove(IcebergSource.ICEBERG_DATABASE_NAME);
+    sourceState = new SourceState(new State(properties));
+
+    // Use reflection to call private validateConfiguration method
+    Method m = IcebergSource.class.getDeclaredMethod("validateConfiguration", 
SourceState.class);
+    m.setAccessible(true);
+
+    try {
+      m.invoke(icebergSource, sourceState);
+      Assert.fail("Should throw exception for missing database name");
+    } catch (java.lang.reflect.InvocationTargetException e) {
+      Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+      
Assert.assertTrue(e.getCause().getMessage().contains("iceberg.database.name is 
required"));
+    }
+  }
+
+  @Test
+  public void testFileGrouping() throws Exception {
+    // Test with more files than files per work unit
+    properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "3");
+    sourceState = new SourceState(new State(properties));
+
+    // Mock 6 files to test grouping
+    List<String> dataFilePaths = Arrays.asList(
+      "file1.parquet", "file2.parquet", "file3.parquet",
+      "file4.parquet", "file5.parquet", "file6.parquet"
+    );
+
+    // Convert to FilePathWithPartition
+    List<FilePathWithPartition> filesWithPartitions =
+      dataFilePaths.stream()
+        .map(path -> new FilePathWithPartition(
+          path, new java.util.HashMap<>(), 0L))
+        .collect(Collectors.toList());
+
+    // Setup table mock
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+
+    // Use reflection to call createWorkUnitsFromFiles directly on data files
+    Method m = 
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class, 
SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource, 
filesWithPartitions, sourceState, mockTable);
+
+    // Should create 2 work units: [3 files], [3 files]
+    Assert.assertEquals(workUnits.size(), 2, "Should create 2 work units for 6 
files with files.per.workunit=3");
+
+    // Verify file distribution
+    int totalFiles = 0;
+    for (WorkUnit workUnit : workUnits) {
+      String filesToPull = 
workUnit.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL);
+      int filesInThisUnit = filesToPull.split(",").length;
+      totalFiles += filesInThisUnit;
+      Assert.assertTrue(filesInThisUnit <= 3, "No work unit should have more 
than 3 files");
+    }
+    Assert.assertEquals(totalFiles, 6, "Total files across all work units 
should be 6");
+  }
+
+  /**
+   * Helper method to set up mock file discovery
+   */
+  private void setupMockFileDiscovery(List<String> filePaths) throws Exception 
{
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    when(mockCatalog.openTable("test_db", "test_table")).thenReturn(mockTable);
+    when(mockCatalog.tableAlreadyExists(mockTable)).thenReturn(true);
+    when(mockTable.getCurrentSnapshotInfo()).thenReturn(mockSnapshot);
+
+    // Set up snapshot to return the specified file paths
+    when(mockSnapshot.getManifestListPath()).thenReturn("manifest-list.avro");
+    when(mockSnapshot.getMetadataPath()).thenReturn(Optional.empty());
+
+    // Create manifest info with data files
+    List<String> dataFiles = filePaths.stream()
+      .filter(path -> path.endsWith(".parquet") || path.endsWith(".orc") || 
path.endsWith(".avro"))
+      .filter(path -> !path.contains("manifest"))
+      .collect(Collectors.toList());
+
+    IcebergSnapshotInfo.ManifestFileInfo manifestInfo = new 
IcebergSnapshotInfo.ManifestFileInfo(
+      "manifest1.avro", dataFiles);
+    
when(mockSnapshot.getManifestFiles()).thenReturn(Arrays.asList(manifestInfo));
+  }
+
+  @Test
+  public void testLookbackPeriodLogic() throws Exception {
+    // Test that lookback period correctly discovers multiple days of 
partitions
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-03");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3");
+    sourceState = new SourceState(new State(properties));
+
+    // Mock 3 days of partitions
+    List<FilePathWithPartition> filesFor3Days = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-03"), 1000L),
+      new FilePathWithPartition(
+        "/data/file2.parquet", createPartitionMap("datepartition", 
"2025-04-02"), 1000L),
+      new FilePathWithPartition(
+        "/data/file3.parquet", createPartitionMap("datepartition", 
"2025-04-01"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+      .thenReturn(filesFor3Days);
+
+    // Test discovery
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<FilePathWithPartition> discovered =
+      (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, 
mockTable);
+
+    // Verify all 3 days discovered
+    Assert.assertEquals(discovered.size(), 3, "Should discover 3 days with 
lookback=3");
+
+    // Verify partition values are set correctly
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    Assert.assertNotNull(partitionValues, "Partition values should be set");
+    // Should contain 3 dates: 2025-04-03, 2025-04-02, 2025-04-01
+    String[] dates = partitionValues.split(",");
+    Assert.assertEquals(dates.length, 3, "Should have 3 partition values");
+  }
+
+  @Test
+  public void testCurrentDatePlaceholder() throws Exception {
+    // Test that CURRENT_DATE placeholder is resolved to current date with 
hourly suffix (default)
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "CURRENT_DATE");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    // Mock today's partition with hourly format
+    String today = java.time.LocalDate.now().toString();
+    String todayHourly = today + "-00";
+    List<FilePathWithPartition> todayFiles = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
todayHourly), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+      .thenReturn(todayFiles);
+
+    // Test discovery
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<FilePathWithPartition> discovered =
+      (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, 
mockTable);
+
+    // Verify today's partition is discovered
+    Assert.assertEquals(discovered.size(), 1, "Should discover today's 
partition");
+
+    // Verify partition value has hourly format (default behavior)
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    Assert.assertNotNull(partitionValues, "Partition values should be set");
+    Assert.assertEquals(partitionValues, todayHourly, "Should resolve to 
today's date with -00 suffix");
+  }
+
+  @Test
+  public void testInvalidDateFormatFails() throws Exception {
+    // Test that invalid date format throws proper exception
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "invalid-date"); 
// Invalid date format
+    sourceState = new SourceState(new State(properties));
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+
+    try {
+      m.invoke(icebergSource, sourceState, mockTable);
+      Assert.fail("Should throw exception for invalid date format");
+    } catch (java.lang.reflect.InvocationTargetException e) {
+      // Unwrap the exception from reflection
+      // Should get IllegalArgumentException wrapping the 
DateTimeParseException
+      Assert.assertTrue(e.getCause() instanceof IllegalArgumentException,
+        "Should throw IllegalArgumentException for invalid date format");
+      Assert.assertTrue(e.getCause().getMessage().contains("Invalid date 
format"),
+        "Error message should indicate invalid date format");
+      Assert.assertTrue(e.getCause().getMessage().contains("yyyy-MM-dd"),
+        "Error message should indicate expected format");
+      // Verify the cause is DateTimeParseException
+      Assert.assertTrue(e.getCause().getCause() instanceof 
java.time.format.DateTimeParseException,
+        "Root cause should be DateTimeParseException");
+    }
+  }
+
+  @Test
+  public void testPartitionFilterConfiguration() throws Exception {
+    // Test with partition filtering enabled
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3");
+    sourceState = new SourceState(new State(properties));
+
+    // Mock partition-aware file discovery with FilePathWithPartition
+    List<FilePathWithPartition> partitionFiles = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/uuid1.parquet", createPartitionMap("datepartition", 
"2025-04-01"), 0L),
+      new FilePathWithPartition(
+        "/data/uuid2.parquet", createPartitionMap("datepartition", 
"2025-04-01"), 0L),
+      new FilePathWithPartition(
+        "/data/uuid3.parquet", createPartitionMap("datepartition", 
"2025-03-31"), 0L),
+      new FilePathWithPartition(
+        "/data/uuid4.parquet", createPartitionMap("datepartition", 
"2025-03-30"), 0L)
+    );
+
+    // Mock the table to return partition-specific files with metadata
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+      .thenReturn(partitionFiles);
+
+    // Use reflection to test discoverPartitionFilePaths
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths", 
SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<FilePathWithPartition> discoveredFiles =
+      (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, 
mockTable);
+
+    // Verify partition filter was applied
+    Assert.assertEquals(discoveredFiles.size(), 4, "Should discover files from 
filtered partitions");
+  }
+
+  @Test
+  public void testPartitionInfoPropagation() throws Exception {
+    // Test that partition info is propagated to work units
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> filesWithPartitions = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/uuid1.parquet", createPartitionMap("datepartition", 
"2025-04-01"), 0L),
+      new FilePathWithPartition(
+        "/data/uuid2.parquet", createPartitionMap("datepartition", 
"2025-04-01"), 0L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+
+    // Set partition info on source state (simulating 
discoverPartitionFilePaths behavior)
+    sourceState.setProp(IcebergSource.ICEBERG_PARTITION_KEY, "datepartition");
+    sourceState.setProp(IcebergSource.ICEBERG_PARTITION_VALUES, "2025-04-01");
+
+    // Invoke createWorkUnitsFromFiles
+    Method m = 
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class, 
SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource, 
filesWithPartitions, sourceState, mockTable);
+
+    // Verify partition info is in work unit
+    Assert.assertEquals(workUnits.size(), 1);
+    WorkUnit wu = workUnits.get(0);
+    Assert.assertEquals(wu.getProp(IcebergSource.ICEBERG_PARTITION_KEY), 
"datepartition");
+    Assert.assertEquals(wu.getProp(IcebergSource.ICEBERG_PARTITION_VALUES), 
"2025-04-01");
+
+    // Verify partition path mapping is stored
+    Assert.assertNotNull(wu.getProp(IcebergSource.ICEBERG_FILE_PARTITION_PATH),
+      "Partition path mapping should be stored in work unit");
+  }
+
+  @Test
+  public void testNoFilterPreservesPartitionMetadata() throws Exception {
+    // Test that when filter is disabled, partition metadata is still 
preserved for partitioned tables
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "false");
+    sourceState = new SourceState(new State(properties));
+
+    // Mock all files from a partitioned table with partition metadata
+    List<FilePathWithPartition> allFilesWithPartitions = Arrays.asList(
+      new FilePathWithPartition(
+        "/warehouse/db/table/datepartition=2025-04-01/file1.parquet",
+        createPartitionMap("datepartition", "2025-04-01"), 1000L),
+      new FilePathWithPartition(
+        "/warehouse/db/table/datepartition=2025-04-01/file2.parquet",
+        createPartitionMap("datepartition", "2025-04-01"), 1500L),
+      new FilePathWithPartition(
+        "/warehouse/db/table/datepartition=2025-04-02/file3.parquet",
+        createPartitionMap("datepartition", "2025-04-02"), 2000L),
+      new FilePathWithPartition(
+        "/warehouse/db/table/datepartition=2025-04-03/file4.parquet",
+        createPartitionMap("datepartition", "2025-04-03"), 2500L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    // Mock TableScan with alwaysTrue() filter to return all files with 
partition metadata
+    when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+      .thenReturn(allFilesWithPartitions);
+
+    // Use reflection to test discoverPartitionFilePaths
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths", 
SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<FilePathWithPartition> discoveredFiles = 
(List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, mockTable);
+
+    // Verify all files discovered with partition metadata preserved
+    Assert.assertEquals(discoveredFiles.size(), 4, "Should discover all data 
files");
+        
+    // Verify partition metadata is preserved for each file
+    for (FilePathWithPartition file : discoveredFiles) {
+      Assert.assertNotNull(file.getPartitionData(), "Partition data should be 
present");
+      Assert.assertFalse(file.getPartitionData().isEmpty(), "Partition data 
should not be empty");
+      Assert.assertTrue(file.getPartitionData().containsKey("datepartition"),
+        "Should have datepartition key");
+      Assert.assertFalse(file.getPartitionPath().isEmpty(),
+        "Partition path should not be empty");
+      Assert.assertTrue(file.getPartitionPath().startsWith("datepartition="),
+        "Partition path should be in format: datepartition=<date>");
+    }
+        
+    // Verify files are from different partitions
+    java.util.Set<String> uniquePartitions = discoveredFiles.stream()
+      .map(f -> f.getPartitionData().get("datepartition"))
+      .collect(java.util.stream.Collectors.toSet());
+    Assert.assertEquals(uniquePartitions.size(), 3, "Should have files from 3 
different partitions");
+  }
+
+  @Test
+  public void testEmptyFileList() throws Exception {
+    // Test handling of empty file list
+    List<FilePathWithPartition> emptyList = Arrays.asList();
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class, 
SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource, 
emptyList, sourceState, mockTable);
+
+    // Should return empty list
+    Assert.assertTrue(workUnits.isEmpty(), "Should return empty work unit list 
for empty file list");
+  }
+
+  @Test
+  public void testSingleFilePerWorkUnit() throws Exception {
+    // Test with files per work unit = 1
+    properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "1");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> filesWithPartitions = Arrays.asList(
+      new FilePathWithPartition(
+        "file1.parquet", new java.util.HashMap<>(), 0L),
+      new FilePathWithPartition(
+        "file2.parquet", new java.util.HashMap<>(), 0L),
+      new FilePathWithPartition(
+        "file3.parquet", new java.util.HashMap<>(), 0L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class, 
SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource, 
filesWithPartitions, sourceState, mockTable);
+
+    // Should create 3 work units, one per file
+    Assert.assertEquals(workUnits.size(), 3, "Should create one work unit per 
file");
+
+    for (WorkUnit wu : workUnits) {
+      String filesToPull = 
wu.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL);
+      Assert.assertEquals(filesToPull.split(",").length, 1, "Each work unit 
should have exactly 1 file");
+    }
+  }
+
+  @Test
+  public void testFilterEnabledWithoutDate() throws Exception {
+    // Test that enabling filter without date value throws exception
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.remove(IcebergSource.ICEBERG_FILTER_DATE);
+    sourceState = new SourceState(new State(properties));
+
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths", 
SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+
+    try {
+      m.invoke(icebergSource, sourceState, mockTable);
+      Assert.fail("Expected IllegalArgumentException for missing filter date");
+    } catch (java.lang.reflect.InvocationTargetException e) {
+      // Unwrap the exception from reflection
+      Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+      
Assert.assertTrue(e.getCause().getMessage().contains("iceberg.filter.date is 
required"));
+    }
+  }
+
+  /**
+   * Integration test class that creates real Iceberg tables with multiple 
partitions
+   * and tests partition-specific data file fetching.
+   */
+  public static class IcebergSourcePartitionIntegrationTest extends 
HiveMetastoreTest {
+
+    private static final String TEST_DB_NAME = "test_partition_db";
+    private TableIdentifier partitionedTableId;
+    private Table partitionedTable;
+    private IcebergTable icebergTable;
+
+    // Schema with partition field 'datepartition'
+    private static final org.apache.iceberg.shaded.org.apache.avro.Schema 
partitionedAvroSchema =
+      SchemaBuilder.record("partitioned_test")
+        .fields()
+        .name("id").type().longType().noDefault()
+        .name("datepartition").type().stringType().noDefault()
+        .endRecord();
+    private static final Schema partitionedIcebergSchema =
+      AvroSchemaUtil.toIceberg(partitionedAvroSchema);
+    private static final PartitionSpec partitionSpec =
+      PartitionSpec.builderFor(partitionedIcebergSchema)
+        .identity("datepartition")
+        .build();
+
+    @BeforeClass
+    public void setUp() throws Exception {
+      // Start metastore and create test namespace
+      try {
+        startMetastore();
+      } catch (Exception e) {
+        // Metastore may already be started if another test class ran first
+        // The startMetastore() method creates a default 'hivedb' which will 
fail if already exists
+      }
+      catalog.createNamespace(Namespace.of(TEST_DB_NAME));
+    }
+
+    @BeforeMethod
+    public void setUpEachTest() {
+      // Create a partitioned table for each test
+      partitionedTableId = TableIdentifier.of(TEST_DB_NAME, 
"partitioned_table");
+      partitionedTable = catalog.createTable(partitionedTableId, 
partitionedIcebergSchema, partitionSpec,
+        java.util.Collections.singletonMap("format-version", "2"));
+
+      // Add data files for multiple partitions
+      addDataFilesForPartition("2025-04-01", java.util.Arrays.asList(
+        
"/data/warehouse/test_db/partitioned_table/datepartition=2025-04-01/file1.parquet",
+        
"/data/warehouse/test_db/partitioned_table/datepartition=2025-04-01/file2.parquet"
+      ));
+      addDataFilesForPartition("2025-04-02", java.util.Arrays.asList(
+        
"/data/warehouse/test_db/partitioned_table/datepartition=2025-04-02/file3.parquet"
+      ));
+      addDataFilesForPartition("2025-04-03", java.util.Arrays.asList(
+        
"/data/warehouse/test_db/partitioned_table/datepartition=2025-04-03/file4.parquet",
+        
"/data/warehouse/test_db/partitioned_table/datepartition=2025-04-03/file5.parquet",
+        
"/data/warehouse/test_db/partitioned_table/datepartition=2025-04-03/file6.parquet"
+      ));
+
+      // Create IcebergTable wrapper
+      icebergTable = new IcebergTable(
+        partitionedTableId,
+        catalog.newTableOps(partitionedTableId),
+        catalog.getConf().get(CatalogProperties.URI),
+        catalog.loadTable(partitionedTableId)
+      );
+    }
+
+    @AfterMethod
+    public void cleanUpEachTest() {
+      // Clean up partitioned table
+      if (partitionedTableId != null && catalog != null) {
+        try {
+          catalog.dropTable(partitionedTableId);
+        } catch (Exception e) {
+          // Ignore cleanup errors
+        }
+      }
+    }
+
+    @Test
+    public void testGetDataFilePathsForSinglePartition() throws Exception {
+      // Test fetching data files for a single partition
+      List<String> dt20250402Files = 
icebergTable.getDataFilePathsForPartitionValues("datepartition",
+        java.util.Collections.singletonList("2025-04-02"));
+
+      // Should return exactly 1 file for datepartition=2025-04-02
+      Assert.assertEquals(dt20250402Files.size(), 1,
+        "Should return exactly 1 file for partition datepartition=2025-04-02");
+      
Assert.assertTrue(dt20250402Files.get(0).contains("datepartition=2025-04-02"),
+        "File path should contain partition value");
+      Assert.assertTrue(dt20250402Files.get(0).contains("file3.parquet"),
+        "File path should be file3.parquet");
+    }
+
+    @Test
+    public void testGetDataFilePathsForMultiplePartitions() throws Exception {
+      // Test fetching data files for multiple partitions (OR filter)
+      List<String> multiPartitionFiles = 
icebergTable.getDataFilePathsForPartitionValues("datepartition",
+        java.util.Arrays.asList("2025-04-01", "2025-04-03"));
+
+      // Should return 2 files from datepartition=2025-04-01 and 3 files from 
datepartition=2025-04-03
+      Assert.assertEquals(multiPartitionFiles.size(), 5,
+        "Should return 5 files (2 from datepartition=2025-04-01 + 3 from 
datepartition=2025-04-03)");
+
+      // Verify files from both partitions are present
+      long dt20250401Count = multiPartitionFiles.stream()
+        .filter(path -> path.contains("datepartition=2025-04-01"))
+        .count();
+      long dt20250403Count = multiPartitionFiles.stream()
+        .filter(path -> path.contains("datepartition=2025-04-03"))
+        .count();
+
+      Assert.assertEquals(dt20250401Count, 2, "Should have 2 files from 
datepartition=2025-04-01");
+      Assert.assertEquals(dt20250403Count, 3, "Should have 3 files from 
datepartition=2025-04-03");
+
+      // Verify no files from datepartition=2025-04-02
+      boolean hasFilesFromExcludedPartition = multiPartitionFiles.stream()
+        .anyMatch(path -> path.contains("datepartition=2025-04-02"));
+      Assert.assertFalse(hasFilesFromExcludedPartition,
+        "Should not include files from datepartition=2025-04-02");
+    }
+
+    @Test
+    public void testGetDataFilePathsForAllPartitions() throws Exception {
+      // Test fetching all data files across all partitions
+      List<String> allFiles = 
icebergTable.getDataFilePathsForPartitionValues("datepartition",
+        java.util.Arrays.asList("2025-04-01", "2025-04-02", "2025-04-03"));
+
+      // Should return all 6 files (2 + 1 + 3)
+      Assert.assertEquals(allFiles.size(), 6,
+        "Should return all 6 files across all partitions");
+
+      // Verify distribution across partitions
+      long dt20250401Count = allFiles.stream().filter(p -> 
p.contains("datepartition=2025-04-01")).count();
+      long dt20250402Count = allFiles.stream().filter(p -> 
p.contains("datepartition=2025-04-02")).count();
+      long dt20250403Count = allFiles.stream().filter(p -> 
p.contains("datepartition=2025-04-03")).count();
+
+      Assert.assertEquals(dt20250401Count, 2);
+      Assert.assertEquals(dt20250402Count, 1);
+      Assert.assertEquals(dt20250403Count, 3);
+    }
+
+    @Test
+    public void testGetDataFilePathsForNonExistentPartition() throws Exception 
{
+      // Test fetching data files for a partition that doesn't exist
+      List<String> noFiles = 
icebergTable.getDataFilePathsForPartitionValues("datepartition",
+        java.util.Collections.singletonList("2025-12-31"));
+
+      // Should return empty list
+      Assert.assertTrue(noFiles.isEmpty(),
+        "Should return empty list for non-existent partition");
+    }
+
+    /**
+     * Helper method to add data files for a specific partition
+     */
+    private void addDataFilesForPartition(String partitionValue, List<String> 
filePaths) {
+      PartitionData partitionData =
+        new PartitionData(partitionSpec.partitionType());
+      partitionData.set(0, partitionValue);
+
+      AppendFiles append = partitionedTable.newAppend();
+      for (String filePath : filePaths) {
+        DataFile dataFile = DataFiles.builder(partitionSpec)
+          .withPath(filePath)
+          .withFileSizeInBytes(100L)
+          .withRecordCount(10L)
+          .withPartition(partitionData)
+          .withFormat(FileFormat.PARQUET)
+          .build();
+        append.appendFile(dataFile);
+      }
+      append.commit();
+    }
+  }
+
+  /**
+   * Helper method to create partition map for testing
+   */
+  private Map<String, String> createPartitionMap(String key, String value) {
+    Map<String, String> partitionMap = new HashMap<>();
+    partitionMap.put(key, value);
+    return partitionMap;
+  }
+
+  @Test
+  public void testWorkUnitSizeTracking() throws Exception {
+    // Test that work units include file size information for dynamic scaling
+    properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "2");
+    sourceState = new SourceState(new State(properties));
+
+    // Create files with different sizes
+    List<FilePathWithPartition> filesWithSizes = Arrays.asList(
+      new FilePathWithPartition(
+        "file1.parquet", new HashMap<>(), 1073741824L), // 1 GB
+      new FilePathWithPartition(
+        "file2.parquet", new HashMap<>(), 536870912L),  // 512 MB
+      new FilePathWithPartition(
+        "file3.parquet", new HashMap<>(), 2147483648L), // 2 GB
+      new FilePathWithPartition(
+        "file4.parquet", new HashMap<>(), 268435456L)   // 256 MB
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+
+    // Invoke createWorkUnitsFromFiles
+    Method m = 
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class, 
SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource, 
filesWithSizes, sourceState, mockTable);
+
+    // Should create 2 work units (4 files / 2 files per unit)
+    Assert.assertEquals(workUnits.size(), 2, "Should create 2 work units");
+
+    // Verify each work unit has WORK_UNIT_SIZE set
+    WorkUnit wu1 = workUnits.get(0);
+    long wu1Size = wu1.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE);
+    Assert.assertEquals(wu1Size, 1073741824L + 536870912L, // 1 GB + 512 MB
+      "WorkUnit 1 should have total size of its files");
+
+    WorkUnit wu2 = workUnits.get(1);
+    long wu2Size = wu2.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE);
+    Assert.assertEquals(wu2Size, 2147483648L + 268435456L, // 2 GB + 256 MB
+      "WorkUnit 2 should have total size of its files");
+
+    // Verify work unit weight is set for bin packing
+    String weight1 = wu1.getProp("iceberg.workUnitWeight");
+    Assert.assertNotNull(weight1, "Work unit weight should be set");
+    Assert.assertEquals(Long.parseLong(weight1), wu1Size, "Weight should equal 
total size");
+
+    String weight2 = wu2.getProp("iceberg.workUnitWeight");
+    Assert.assertNotNull(weight2, "Work unit weight should be set");
+    Assert.assertEquals(Long.parseLong(weight2), wu2Size, "Weight should equal 
total size");
+  }
+
+  @Test
+  public void testBinPackingDisabled() throws Exception {
+    // Test that bin packing is skipped when not configured
+    properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "1");
+    // Do NOT set binPacking.maxSizePerBin - bin packing should be disabled
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> filesWithSizes = Arrays.asList(
+      new FilePathWithPartition(
+        "file1.parquet", new HashMap<>(), 1000L),
+      new FilePathWithPartition(
+        "file2.parquet", new HashMap<>(), 2000L),
+      new FilePathWithPartition(
+        "file3.parquet", new HashMap<>(), 3000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+
+    // Create work units
+    Method createMethod = 
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class, 
SourceState.class, IcebergTable.class);
+    createMethod.setAccessible(true);
+    List<WorkUnit> initialWorkUnits = (List<WorkUnit>) 
createMethod.invoke(icebergSource, filesWithSizes, sourceState, mockTable);
+
+    // Apply bin packing (should return original list)
+    Method binPackMethod = 
IcebergSource.class.getDeclaredMethod("applyBinPacking", List.class, 
SourceState.class);
+    binPackMethod.setAccessible(true);
+    List<? extends WorkUnit> packedWorkUnits = (List<? extends WorkUnit>) 
binPackMethod.invoke(icebergSource, initialWorkUnits, sourceState);
+
+    // Should return same number of work units (no packing applied)
+    Assert.assertEquals(packedWorkUnits.size(), initialWorkUnits.size(),
+      "Bin packing should be disabled, returning original work units");
+    Assert.assertEquals(packedWorkUnits.size(), 3, "Should have 3 unpacked 
work units");
+  }
+
+  @Test
+  public void testBinPackingEnabled() throws Exception {
+    // Test that bin packing groups work units by size using 
WorstFitDecreasing algorithm
+    properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "1");
+    // Use CopySource bin packing configuration key for consistency
+    properties.setProperty(CopySource.MAX_SIZE_MULTI_WORKUNITS, "5000"); // 
5KB max per bin
+    sourceState = new SourceState(new State(properties));
+
+    // Create 6 work units with sizes: 1KB, 1KB, 2KB, 2KB, 3KB, 3KB (total 
12KB)
+    // WorstFitDecreasing algorithm packs largest items first:
+    // Expected packing with 5KB limit:
+    //   Bin 1: 3KB + 2KB = 5KB
+    //   Bin 2: 3KB + 2KB = 5KB
+    //   Bin 3: 1KB + 1KB = 2KB
+    // Total: 3 bins
+    List<FilePathWithPartition> filesWithSizes = Arrays.asList(
+      new FilePathWithPartition(
+        "file1.parquet", new HashMap<>(), 1000L),
+      new FilePathWithPartition(
+        "file2.parquet", new HashMap<>(), 1000L),
+      new FilePathWithPartition(
+        "file3.parquet", new HashMap<>(), 2000L),
+      new FilePathWithPartition(
+        "file4.parquet", new HashMap<>(), 2000L),
+      new FilePathWithPartition(
+        "file5.parquet", new HashMap<>(), 3000L),
+      new FilePathWithPartition(
+        "file6.parquet", new HashMap<>(), 3000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+
+    // Create initial work units (1 file per work unit)
+    Method createMethod = 
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class, 
SourceState.class, IcebergTable.class);
+    createMethod.setAccessible(true);
+    List<WorkUnit> initialWorkUnits = (List<WorkUnit>) 
createMethod.invoke(icebergSource, filesWithSizes, sourceState, mockTable);
+
+    Assert.assertEquals(initialWorkUnits.size(), 6, "Should create 6 initial 
work units");
+
+    // Apply bin packing
+    Method binPackMethod = 
IcebergSource.class.getDeclaredMethod("applyBinPacking", List.class, 
SourceState.class);
+    binPackMethod.setAccessible(true);
+    List<? extends WorkUnit> packedWorkUnits = (List<? extends WorkUnit>) 
binPackMethod.invoke(icebergSource, initialWorkUnits, sourceState);
+
+    // Verify bin packing reduced work unit count
+    Assert.assertTrue(packedWorkUnits.size() < initialWorkUnits.size(),
+      "Bin packing should reduce work unit count from 6 to 3");
+
+    // Verify exact bin count (WorstFitDecreasing packs optimally)
+    Assert.assertEquals(packedWorkUnits.size(), 3,
+      "WorstFitDecreasing should pack 6 files (1KB,1KB,2KB,2KB,3KB,3KB) into 
exactly 3 bins with 5KB limit");
+
+    // Note: Individual bin sizes are not directly accessible on MultiWorkUnit 
returned by bin packing
+    // Size validation is covered by testWorkUnitSizeTracking() which 
validates WORK_UNIT_SIZE
+    // is set correctly on individual work units before bin packing
+  }
+
+  @Test
+  public void testSimulateModeReturnsEmptyList() throws Exception {
+    // Test that simulate mode configuration is respected and would return 
empty list
+    properties.setProperty(CopySource.SIMULATE, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-10-21");
+    sourceState = new SourceState(new State(properties));
+
+    // Mock files that would be discovered
+    List<FilePathWithPartition> mockFiles = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-10-21"), 1000L),
+      new FilePathWithPartition(
+        "/data/file2.parquet", createPartitionMap("datepartition", 
"2025-10-21"), 2000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+      .thenReturn(mockFiles);
+
+    // Test 1: Verify simulate mode is enabled in configuration
+    Assert.assertTrue(sourceState.contains(CopySource.SIMULATE),
+      "Simulate mode configuration should be present");
+    Assert.assertTrue(sourceState.getPropAsBoolean(CopySource.SIMULATE),
+      "Simulate mode should be enabled");
+
+    // Test 2: File discovery should work normally in simulate mode (discovery 
happens before simulate check)
+    Method discoverMethod = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    discoverMethod.setAccessible(true);
+    List<FilePathWithPartition> discovered =
+      (List<FilePathWithPartition>)
+      discoverMethod.invoke(icebergSource, sourceState, mockTable);
+
+    Assert.assertEquals(discovered.size(), 2,
+      "In simulate mode, file discovery should work normally (happens before 
simulate check)");
+
+    // Test 3: Work units should be created normally (happens before simulate 
check)
+    Method createMethod = 
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles",
+      List.class, SourceState.class, IcebergTable.class);
+    createMethod.setAccessible(true);
+    List<WorkUnit> workUnitsBeforeSimulateCheck = (List<WorkUnit>) 
createMethod.invoke(
+      icebergSource, discovered, sourceState, mockTable);
+
+    Assert.assertFalse(workUnitsBeforeSimulateCheck.isEmpty(),
+      "Work units should be created before simulate mode check");
+    Assert.assertEquals(workUnitsBeforeSimulateCheck.size(), 1,
+      "Should create 1 work unit from 2 files before simulate check");
+
+    // Test 4: Verify logSimulateMode can be called successfully (logs the 
plan)
+    Method logMethod = IcebergSource.class.getDeclaredMethod("logSimulateMode",
+      List.class, List.class, SourceState.class);
+    logMethod.setAccessible(true);
+    // Should not throw - just logs the simulate mode plan
+    logMethod.invoke(icebergSource, workUnitsBeforeSimulateCheck, discovered, 
sourceState);
+
+    // Test 5: Verify the critical behavior - after simulate check, work units 
should NOT be returned
+    // Simulate the conditional logic from getWorkunits()
+    List<WorkUnit> actualReturnedWorkUnits;
+    if (sourceState.contains(CopySource.SIMULATE)
+      && sourceState.getPropAsBoolean(CopySource.SIMULATE)) {
+      // This is what getWorkunits() does in simulate mode
+      actualReturnedWorkUnits = Lists.newArrayList(); // Empty list
+    } else {
+      actualReturnedWorkUnits = workUnitsBeforeSimulateCheck;
+    }
+
+    // Assert: In simulate mode, the returned work units should be EMPTY
+    Assert.assertTrue(actualReturnedWorkUnits.isEmpty(),
+      "Simulate mode: getWorkunits() should return empty list (no execution)");
+    Assert.assertEquals(actualReturnedWorkUnits.size(), 0,
+      "Simulate mode: zero work units should be returned for execution");
+
+    // Verify the work units were created but NOT returned
+    Assert.assertEquals(workUnitsBeforeSimulateCheck.size(), 1,
+      "Work units were created internally but not returned due to simulate 
mode");
+  }
+
+  @Test
+  public void testHourlyPartitionDateFormat() throws Exception {
+    // Test that hourly partition format is generated when enabled
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01"); 
// Standard date format
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true"); // Enable hourly format
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+    sourceState = new SourceState(new State(properties));
+
+    // Mock partition with hourly format
+    List<FilePathWithPartition> files = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-01-00"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+      .thenReturn(files);
+
+    // Test discovery
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<FilePathWithPartition> discovered =
+      (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, 
mockTable);
+
+    // Verify partition is discovered
+    Assert.assertEquals(discovered.size(), 1, "Should discover partition");
+
+    // Verify partition value has -00 suffix appended
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    Assert.assertNotNull(partitionValues, "Partition values should be set");
+    Assert.assertEquals(partitionValues, "2025-04-01-00", "Should append -00 
suffix for hourly partition");
+  }
+
+  @Test
+  public void testHourlyPartitionDateFormatWithLookback() throws Exception {
+    // Test that hourly partition format works with lookback period
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+    properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-03"); 
// Standard date format
+    properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED, 
"true"); // Enable hourly format
+    properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3");
+    sourceState = new SourceState(new State(properties));
+
+    // Mock 3 days of partitions with hourly format
+    List<FilePathWithPartition> filesFor3Days = Arrays.asList(
+      new FilePathWithPartition(
+        "/data/file1.parquet", createPartitionMap("datepartition", 
"2025-04-03-00"), 1000L),
+      new FilePathWithPartition(
+        "/data/file2.parquet", createPartitionMap("datepartition", 
"2025-04-02-00"), 1000L),
+      new FilePathWithPartition(
+        "/data/file3.parquet", createPartitionMap("datepartition", 
"2025-04-01-00"), 1000L)
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+    when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+      .thenReturn(filesFor3Days);
+
+    // Test discovery
+    Method m = 
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+      SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<FilePathWithPartition> discovered =
+      (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, 
mockTable);
+
+    // Verify all 3 days discovered
+    Assert.assertEquals(discovered.size(), 3, "Should discover 3 days with 
lookback=3");
+
+    // Verify partition values have -00 suffix appended to all dates
+    String partitionValues = 
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+    Assert.assertNotNull(partitionValues, "Partition values should be set");
+    String[] dates = partitionValues.split(",");
+    Assert.assertEquals(dates.length, 3, "Should have 3 partition values");
+    
+    // Verify -00 suffix is appended to all dates
+    Assert.assertEquals(dates[0], "2025-04-03-00", "Should have -00 suffix for 
day 0");
+    Assert.assertEquals(dates[1], "2025-04-02-00", "Should have -00 suffix for 
day 1");
+    Assert.assertEquals(dates[2], "2025-04-01-00", "Should have -00 suffix for 
day 2");
+    
+    // Verify all follow the hourly format pattern
+    for (String date : dates) {
+      Assert.assertEquals(date.length(), 13, "Date should be in yyyy-MM-dd-00 
format (13 chars)");
+      Assert.assertTrue(date.matches("\\d{4}-\\d{2}-\\d{2}-00"), "Date should 
match yyyy-MM-dd-00 pattern");
+    }
+  }
+
+  @Test
+  public void testZeroSizeFilesHandling() throws Exception {
+    // Test handling of files with zero or very small sizes
+    properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "3");
+    sourceState = new SourceState(new State(properties));
+
+    List<FilePathWithPartition> filesWithSizes = Arrays.asList(
+      new FilePathWithPartition(
+        "file1.parquet", new HashMap<>(), 0L),     // Empty file
+      new FilePathWithPartition(
+        "file2.parquet", new HashMap<>(), 1L),     // 1 byte
+      new FilePathWithPartition(
+        "file3.parquet", new HashMap<>(), 100L)    // 100 bytes
+    );
+
+    TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+    when(mockTable.getTableId()).thenReturn(tableId);
+
+    // Create work units
+    Method m = 
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class, 
SourceState.class, IcebergTable.class);
+    m.setAccessible(true);
+    List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource, 
filesWithSizes, sourceState, mockTable);
+
+    // Should handle gracefully
+    Assert.assertEquals(workUnits.size(), 1);
+    WorkUnit wu = workUnits.get(0);
+
+    long totalSize = wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE);
+    Assert.assertEquals(totalSize, 101L, "Total size should be 0 + 1 + 100 = 
101");
+
+    // Weight should be at least 1 (minimum weight)
+    String weightStr = wu.getProp("iceberg.workUnitWeight");
+    long weight = Long.parseLong(weightStr);
+    Assert.assertTrue(weight >= 1L, "Weight should be at least 1 for very 
small files");
+  }
+
+}
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index db3bc3257c..f083e7e767 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -116,7 +116,12 @@ public class IcebergTableTest extends HiveMetastoreTest {
 
   @BeforeClass
   public void setUp() throws Exception {
-    startMetastore();
+    try {
+      startMetastore();
+    } catch (Exception e) {
+      // Metastore may already be started if another test class ran first
+      // The startMetastore() method creates a default 'hivedb' which will 
fail if already exists
+    }
     catalog.createNamespace(Namespace.of(dbName));
   }
 

Reply via email to