This is an automated email from the ASF dual-hosted git repository.
vivekrai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new e98452a049 [GOBBLIN-2231] Implement IcebergSource to enable copying
Iceberg data files to any dest (#4146)
e98452a049 is described below
commit e98452a0495e38e029b056d8953ba3ab417ae7af
Author: Prateek Khandelwal <[email protected]>
AuthorDate: Mon Nov 10 22:59:47 2025 +0530
[GOBBLIN-2231] Implement IcebergSource to enable copying Iceberg data files
to any dest (#4146)
* Implement IcebergSource to enable copying Iceberg data files to any dest
* Introduce partition-aware discovery & lookback period based copy for
IcebergSource
* Added unit tests for IcebergSource
* Added unit tests for IcebergFileStreamHelper
* Add FilePathWithPartition class in IcebergTable
* Address review comments
* Fix indentation
* Handling for yyyy-MM-dd-00 partition format
* Remove unused import
* Address review comment
* Fix core tests error
* Fix failing test
* Fix failing test
---
.../copy/iceberg/IcebergFileStreamExtractor.java | 58 +
.../copy/iceberg/IcebergFileStreamHelper.java | 140 +++
.../management/copy/iceberg/IcebergSource.java | 604 +++++++++++
.../data/management/copy/iceberg/IcebergTable.java | 125 +++
.../copy/iceberg/IcebergFileStreamHelperTest.java | 302 ++++++
.../management/copy/iceberg/IcebergSourceTest.java | 1116 ++++++++++++++++++++
.../management/copy/iceberg/IcebergTableTest.java | 7 +-
7 files changed, 2351 insertions(+), 1 deletion(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
new file mode 100644
index 0000000000..e87b187d74
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.lang3.NotImplementedException;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.source.extractor.filebased.FileBasedExtractor;
+
+/**
+ * Extractor for file streaming mode that creates FileAwareInputStream for
each file.
+ *
+ * This extractor is used when {@code iceberg.record.processing.enabled=false}
to stream
+ * Iceberg table files as binary data to destinations like Azure, HDFS
+ *
+ * Each "record" is a {@link FileAwareInputStream} representing one file from
+ * the Iceberg table. The downstream writer handles streaming the file content.
+ */
+@Slf4j
+public class IcebergFileStreamExtractor extends FileBasedExtractor<String,
FileAwareInputStream> {
+
+ public IcebergFileStreamExtractor(WorkUnitState workUnitState) throws
IOException {
+ super(workUnitState, new IcebergFileStreamHelper(workUnitState));
+ }
+
+ @Override
+ public String getSchema() {
+ // For file streaming, schema is not used by IdentityConverter; returning
a constant
+ return "FileAwareInputStream";
+ }
+
+ @Override
+ public Iterator<FileAwareInputStream> downloadFile(String filePath) throws
IOException {
+ throw new NotImplementedException("Not yet implemented");
+ }
+
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
new file mode 100644
index 0000000000..ebe69ef1ce
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+import
org.apache.gobblin.source.extractor.filebased.TimestampAwareFileBasedHelper;
+
+/**
+ * File-based helper for Iceberg file streaming operations.
+ *
+ * This helper supports file streaming mode where Iceberg table files
+ * are streamed as binary data without record-level processing.
+ */
+@Slf4j
+public class IcebergFileStreamHelper implements TimestampAwareFileBasedHelper {
+
+ private final State state;
+ private final Configuration configuration;
+ private FileSystem fileSystem;
+
+ public IcebergFileStreamHelper(State state) {
+ this.state = state;
+ this.configuration = new Configuration();
+
+ // Add any Hadoop configuration from job properties
+ for (String key : state.getPropertyNames()) {
+ if (key.startsWith("fs.") || key.startsWith("hadoop.")) {
+ configuration.set(key, state.getProp(key));
+ }
+ }
+ }
+
+ @Override
+ public void connect() throws FileBasedHelperException {
+ try {
+ this.fileSystem = FileSystem.get(configuration);
+ log.info("Connected to Iceberg file stream helper with FileSystem: {}",
fileSystem.getClass().getSimpleName());
+ } catch (IOException e) {
+ throw new FileBasedHelperException("Failed to initialize FileSystem for
Iceberg file streaming", e);
+ }
+ }
+
+ @Override
+ public List<String> ls(String path) throws FileBasedHelperException {
+ try {
+ // For Iceberg, file discovery is handled by IcebergSource
+ // This method returns files from work unit configuration
+ List<String> filesToPull =
state.getPropAsList(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, "");
+ log.debug("Returning {} files for processing", filesToPull.size());
+ return filesToPull;
+ } catch (Exception e) {
+ throw new FileBasedHelperException("Failed to list files", e);
+ }
+ }
+
+ @Override
+ public InputStream getFileStream(String filePath) throws
FileBasedHelperException {
+ try {
+ Path path = new Path(filePath);
+ FileSystem fs = getFileSystemForPath(path);
+ return fs.open(path);
+ } catch (IOException e) {
+ throw new FileBasedHelperException("Failed to get file stream for: " +
filePath, e);
+ }
+ }
+
+ @Override
+ public long getFileSize(String filePath) throws FileBasedHelperException {
+ try {
+ Path path = new Path(filePath);
+ FileSystem fs = getFileSystemForPath(path);
+ return fs.getFileStatus(path).getLen();
+ } catch (IOException e) {
+ throw new FileBasedHelperException("Failed to get file size for: " +
filePath, e);
+ }
+ }
+
+ @Override
+ public long getFileMTime(String filePath) throws FileBasedHelperException {
+ try {
+ Path path = new Path(filePath);
+ FileSystem fs = getFileSystemForPath(path);
+ return fs.getFileStatus(path).getModificationTime();
+ } catch (IOException e) {
+ throw new FileBasedHelperException("Failed to get file modification time
for: " + filePath, e);
+ }
+ }
+
+ private FileSystem getFileSystemForPath(Path path) throws IOException {
+ // If path has a different scheme than the default FileSystem, get
scheme-specific FS
+ if (path.toUri().getScheme() != null &&
+ !path.toUri().getScheme().equals(fileSystem.getUri().getScheme())) {
+ return path.getFileSystem(configuration);
+ }
+ return fileSystem;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (fileSystem != null) {
+ try {
+ fileSystem.close();
+ log.info("Closed Iceberg file stream helper and FileSystem
connection");
+ } catch (IOException e) {
+ log.warn("Error closing FileSystem connection", e);
+ throw e;
+ }
+ } else {
+ log.debug("Closing Iceberg file stream helper - no FileSystem to close");
+ }
+ }
+
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java
new file mode 100644
index 0000000000..10baa7846c
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java
@@ -0,0 +1,604 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.IOException;
+import java.net.URI;
+import java.time.LocalDate;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.base.Optional;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.dataset.DatasetConstants;
+import org.apache.gobblin.dataset.DatasetDescriptor;
+import org.apache.gobblin.metrics.event.lineage.LineageInfo;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+import org.apache.gobblin.source.extractor.filebased.FileBasedSource;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.source.workunit.WorkUnitWeighter;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.binpacking.FieldWeighter;
+import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+/**
+ * Unified Iceberg source that supports partition-based data copying from
Iceberg tables.
+ *
+ * This source reads job configuration, applies date partition filters with
optional lookback period,
+ * and uses Iceberg's TableScan API to enumerate data files for specific
partitions. It groups files
+ * into work units for parallel processing.
+ *
+ * <pre>
+ * # Basic configuration
+ * source.class=org.apache.gobblin.data.management.copy.iceberg.IcebergSource
+ * iceberg.database.name=db1
+ * iceberg.table.name=table1
+ * iceberg.catalog.uri=ICEBERG_CATALOG_URI
+ *
+ * # Partition filtering with lookback - Static date (hourly partitions by
default)
+ * iceberg.filter.enabled=true
+ * iceberg.partition.column=datepartition # Optional, defaults to
"datepartition"
+ * iceberg.filter.date=2025-04-01 # Date value (yyyy-MM-dd format)
+ * iceberg.lookback.days=3
+ * # iceberg.hourly.partition.enabled=true # Optional, defaults to true;
generates yyyy-MM-dd-00 format
+ *
+ * # Partition filtering with lookback - Scheduled flows (dynamic date)
+ * iceberg.filter.enabled=true
+ * iceberg.filter.date=CURRENT_DATE # Uses current date at runtime
+ * iceberg.lookback.days=3
+ *
+ * # Standard daily partitions (disable hourly suffix if table uses yyyy-MM-dd
format)
+ * iceberg.filter.enabled=true
+ * iceberg.filter.date=2025-04-01 # Date in yyyy-MM-dd format
+ * iceberg.hourly.partition.enabled=false # Set false for non-hourly partitions
+ * iceberg.lookback.days=3
+ *
+ * # Copy all data (no filtering)
+ * iceberg.filter.enabled=false
+ * # No filter.date needed - will copy all partitions from current snapshot
+ *
+ * # Bin packing for better resource utilization
+ * gobblin.copy.binPacking.maxSizePerBin=1000000000 # 1GB per bin
+ * </pre>
+ */
+@Slf4j
+public class IcebergSource extends FileBasedSource<String,
FileAwareInputStream> {
+
+ public static final String ICEBERG_DATABASE_NAME = "iceberg.database.name";
+ public static final String ICEBERG_TABLE_NAME = "iceberg.table.name";
+ public static final String ICEBERG_CATALOG_URI = "iceberg.catalog.uri";
+ public static final String ICEBERG_CATALOG_CLASS = "iceberg.catalog.class";
+ public static final String DEFAULT_ICEBERG_CATALOG_CLASS =
"org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
+ public static final String ICEBERG_RECORD_PROCESSING_ENABLED =
"iceberg.record.processing.enabled";
+ public static final boolean DEFAULT_RECORD_PROCESSING_ENABLED = false;
+ public static final String ICEBERG_FILES_PER_WORKUNIT =
"iceberg.files.per.workunit";
+ public static final int DEFAULT_FILES_PER_WORKUNIT = 10;
+ public static final String ICEBERG_FILTER_ENABLED = "iceberg.filter.enabled";
+ public static final String ICEBERG_FILTER_DATE = "iceberg.filter.date"; //
Date value (e.g., 2025-04-01 or CURRENT_DATE)
+ public static final String ICEBERG_LOOKBACK_DAYS = "iceberg.lookback.days";
+ public static final int DEFAULT_LOOKBACK_DAYS = 1;
+ public static final String ICEBERG_PARTITION_COLUMN =
"iceberg.partition.column"; // configurable partition column name
+ public static final String DEFAULT_DATE_PARTITION_COLUMN = "datepartition";
// default date partition column name
+ public static final String CURRENT_DATE_PLACEHOLDER = "CURRENT_DATE"; //
placeholder for current date
+ public static final String ICEBERG_PARTITION_KEY = "iceberg.partition.key";
+ public static final String ICEBERG_PARTITION_VALUES =
"iceberg.partition.values";
+ public static final String ICEBERG_FILE_PARTITION_PATH =
"iceberg.file.partition.path";
+ public static final String ICEBERG_HOURLY_PARTITION_ENABLED =
"iceberg.hourly.partition.enabled";
+ public static final boolean DEFAULT_HOURLY_PARTITION_ENABLED = true;
+ private static final String HOURLY_PARTITION_SUFFIX = "-00";
+ private static final String WORK_UNIT_WEIGHT = "iceberg.workUnitWeight";
+
+ private Optional<LineageInfo> lineageInfo;
+ private final WorkUnitWeighter weighter = new
FieldWeighter(WORK_UNIT_WEIGHT);
+
+ /**
+ * Initialize file system helper based on mode (streaming vs record
processing)
+ */
+ @Override
+ public void initFileSystemHelper(State state) throws
FileBasedHelperException {
+ // For file streaming mode, we use IcebergFileStreamHelper
+ // For record processing mode, we'll use a different helper (future
implementation)
+ boolean recordProcessingEnabled = state.getPropAsBoolean(
+ ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED);
+
+ if (recordProcessingEnabled) {
+ // Future: Initialize helper for record processing
+ throw new UnsupportedOperationException("Record processing mode not yet
implemented. " +
+ "This will be added when SQL/Data Lake destinations are required.");
+ } else {
+ // Initialize helper for file streaming - now implements
TimestampAwareFileBasedHelper
+ this.fsHelper = new IcebergFileStreamHelper(state);
+ this.fsHelper.connect();
+ }
+ }
+
+ /**
+ * Get work units by discovering files from Iceberg table
+ * @param state is the source state
+ * @return List<WorkUnit> list of work units
+ */
+ @Override
+ public List<WorkUnit> getWorkunits(SourceState state) {
+ this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker());
+
+ try {
+ initFileSystemHelper(state);
+
+ validateConfiguration(state);
+
+ IcebergCatalog catalog = createCatalog(state);
+ String database = state.getProp(ICEBERG_DATABASE_NAME);
+ String table = state.getProp(ICEBERG_TABLE_NAME);
+ IcebergTable icebergTable = catalog.openTable(database, table);
+
+ List<IcebergTable.FilePathWithPartition> filesWithPartitions =
discoverPartitionFilePaths(state, icebergTable);
+ log.info("Discovered {} files from Iceberg table {}.{}",
filesWithPartitions.size(), database, table);
+
+ // Create work units from discovered files
+ List<WorkUnit> workUnits = createWorkUnitsFromFiles(filesWithPartitions,
state, icebergTable);
+
+ // Handle simulate mode - log what would be copied without executing
+ if (state.contains(CopySource.SIMULATE) &&
state.getPropAsBoolean(CopySource.SIMULATE)) {
+ log.info("Simulate mode enabled. Will not execute the copy.");
+ logSimulateMode(workUnits, filesWithPartitions, state);
+ return Lists.newArrayList();
+ }
+
+ // Apply bin packing to work units if configured
+ List<? extends WorkUnit> packedWorkUnits = applyBinPacking(workUnits,
state);
+ log.info("Work unit creation complete. Initial work units: {}, packed
work units: {}",
+ workUnits.size(), packedWorkUnits.size());
+
+ return Lists.newArrayList(packedWorkUnits);
+
+ } catch (Exception e) {
+ log.error("Failed to create work units for Iceberg table", e);
+ throw new RuntimeException("Failed to create work units", e);
+ }
+ }
+
+ /**
+ * Get extractor based on mode (streaming vs record processing)
+ *
+ * @param state a {@link org.apache.gobblin.configuration.WorkUnitState}
carrying properties needed by the returned {@link Extractor}
+ * @return
+ * @throws IOException
+ */
+ @Override
+ public Extractor<String, FileAwareInputStream> getExtractor(WorkUnitState
state) throws IOException {
+ boolean recordProcessingEnabled = state.getPropAsBoolean(
+ ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED);
+
+ if (recordProcessingEnabled) {
+ // Return record processing extractor
+ throw new UnsupportedOperationException("Record processing mode not yet
implemented.");
+ } else {
+ // Return file streaming extractor
+ return new IcebergFileStreamExtractor(state);
+ }
+ }
+
+ /**
+ * Discover partition data files using Iceberg TableScan API with optional
lookback for date partitions.
+ *
+ * <p>This method supports three modes:
+ * <ol>
+ * <li><b>Full table scan (copy all data)</b>: Set {@code
iceberg.filter.enabled=false}.
+ * Returns all data files from current snapshot. Use this for one-time full
copies or backfills.</li>
+ * <li><b>Static date partition filter</b>: Set {@code
iceberg.filter.enabled=true} with a specific date
+ * (e.g., {@code iceberg.filter.date=2025-04-01}). Use this for ad-hoc
historical data copies.</li>
+ * <li><b>Dynamic date partition filter</b>: Set {@code
iceberg.filter.enabled=true} with
+ * {@code iceberg.filter.date=CURRENT_DATE}. The {@value
#CURRENT_DATE_PLACEHOLDER} placeholder
+ * is resolved to the current date at runtime. Use this for daily scheduled
flows.</li>
+ * </ol>
+ *
+ * <p>The partition column name is configurable via {@code
iceberg.partition.column}
+ * (defaults to {@value #DEFAULT_DATE_PARTITION_COLUMN}). The date value is
specified separately via
+ * {@code iceberg.filter.date} in standard format ({@code yyyy-MM-dd}).
+ *
+ * <p><b>Hourly Partition Support:</b> By default, the {@code -00} suffix is
appended to all partition values
+ * for tables using hourly partition format ({@code yyyy-MM-dd-HH}). For
tables using standard daily partition
+ * format ({@code yyyy-MM-dd}), set {@code
iceberg.hourly.partition.enabled=false}. Users should always provide
+ * dates in standard {@code yyyy-MM-dd} format; the hour suffix is
automatically managed.
+ *
+ * <p><b>Configuration Examples:</b>
+ * <ul>
+ * <li>Static: {@code iceberg.partition.column=datepartition,
iceberg.filter.date=2025-04-03, iceberg.lookback.days=3}
+ * discovers: datepartition=2025-04-03, 2025-04-02, 2025-04-01</li>
+ * <li>Dynamic: {@code iceberg.filter.date=CURRENT_DATE,
iceberg.lookback.days=1}
+ * discovers today's partition only (resolved at runtime)</li>
+ * </ul>
+ *
+ * @param state source state containing filter configuration
+ * @param icebergTable the Iceberg table to scan
+ * @return list of file paths with partition metadata matching the filter
criteria
+ * @throws IOException if table scan or file discovery fails
+ */
+ private List<IcebergTable.FilePathWithPartition>
discoverPartitionFilePaths(SourceState state, IcebergTable icebergTable) throws
IOException {
+ boolean filterEnabled = state.getPropAsBoolean(ICEBERG_FILTER_ENABLED,
true);
+
+ if (!filterEnabled) {
+ log.info("Partition filter disabled, discovering all data files with
partition metadata from current snapshot");
+ // Use TableScan without filter to get all files with partition metadata
preserved
+ // This ensures partition structure is maintained even for full table
copies
+ List<IcebergTable.FilePathWithPartition> result =
icebergTable.getFilePathsWithPartitionsForFilter(Expressions.alwaysTrue());
+ log.info("Discovered {} data files from current snapshot with partition
metadata", result.size());
+ return result;
+ }
+
+ String datePartitionColumn = state.getProp(ICEBERG_PARTITION_COLUMN,
DEFAULT_DATE_PARTITION_COLUMN);
+
+ String dateValue = state.getProp(ICEBERG_FILTER_DATE);
+ Preconditions.checkArgument(!StringUtils.isBlank(dateValue),
+ "iceberg.filter.date is required when iceberg.filter.enabled=true");
+
+ // Handle CURRENT_DATE placeholder for flows
+ if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) {
+ dateValue = LocalDate.now().toString();
+ log.info("Resolved {} placeholder to current date: {}",
CURRENT_DATE_PLACEHOLDER, dateValue);
+ }
+
+ // Apply lookback period for date partitions
+ // lookbackDays=1 (default) means copy only the specified date
+ // lookbackDays=3 means copy specified date + 2 previous days (total 3
days)
+ int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS,
DEFAULT_LOOKBACK_DAYS);
+ List<String> values = Lists.newArrayList();
+
+ if (lookbackDays >= 1) {
+ log.info("Applying lookback period of {} days for date partition column
'{}': {}", lookbackDays, datePartitionColumn, dateValue);
+
+ // Check if hourly partitioning is enabled
+ boolean isHourlyPartition =
state.getPropAsBoolean(ICEBERG_HOURLY_PARTITION_ENABLED,
DEFAULT_HOURLY_PARTITION_ENABLED);
+
+ // Parse the date in yyyy-MM-dd format
+ LocalDate start;
+ try {
+ start = LocalDate.parse(dateValue);
+ } catch (java.time.format.DateTimeParseException e) {
+ String errorMsg = String.format(
+ "Invalid date format for '%s': '%s'. Expected format: yyyy-MM-dd.
Error: %s",
+ ICEBERG_FILTER_DATE, dateValue, e.getMessage());
+ log.error(errorMsg);
+ throw new IllegalArgumentException(errorMsg, e);
+ }
+
+ for (int i = 0; i < lookbackDays; i++) {
+ String dateOnly = start.minusDays(i).toString();
+ // Append hour suffix if hourly partitioning is enabled
+ String partitionValue = isHourlyPartition ? dateOnly +
HOURLY_PARTITION_SUFFIX : dateOnly;
+ values.add(partitionValue);
+ log.info("Including partition: {}={}", datePartitionColumn,
partitionValue);
+ }
+ } else {
+ log.error("lookbackDays < 1, cannot apply lookback. lookbackDays={}",
lookbackDays);
+ throw new IllegalArgumentException(String.format(
+ "lookback.days must be >= 1, got: %d", lookbackDays));
+ }
+
+ // Store partition info on state for downstream use (extractor,
destination path mapping)
+ state.setProp(ICEBERG_PARTITION_KEY, datePartitionColumn);
+ state.setProp(ICEBERG_PARTITION_VALUES, String.join(",", values));
+
+ // Use Iceberg TableScan API to get only data files (parquet/orc/avro) for
specified partitions
+ // TableScan.planFiles() returns DataFiles only - no manifest files or
metadata files
+ log.info("Executing TableScan with filter: {}={}", datePartitionColumn,
values);
+ Expression icebergExpr = null;
+ for (String val : values) {
+ Expression e = Expressions.equal(datePartitionColumn, val);
+ icebergExpr = (icebergExpr == null) ? e : Expressions.or(icebergExpr, e);
+ }
+
+ List<IcebergTable.FilePathWithPartition> filesWithPartitions =
icebergTable.getFilePathsWithPartitionsForFilter(icebergExpr);
+ log.info("Discovered {} data files for partitions: {}",
filesWithPartitions.size(), values);
+
+ return filesWithPartitions;
+ }
+
+ /**
+ * Create work units from discovered file paths by grouping them for
parallel processing.
+ *
+ * Files are grouped into work units based on {@code
iceberg.files.per.workunit} configuration.
+ * Each work unit contains metadata about the files to process.
+ *
+ * @param filesWithPartitions list of file paths with partition metadata to
process
+ * @param state source state containing job configuration
+ * @param table the Iceberg table being copied
+ * @return list of work units ready for parallel execution
+ */
+ private List<WorkUnit>
createWorkUnitsFromFiles(List<IcebergTable.FilePathWithPartition>
filesWithPartitions, SourceState state, IcebergTable table) {
+ List<WorkUnit> workUnits = Lists.newArrayList();
+
+ if (filesWithPartitions.isEmpty()) {
+ log.warn("No files discovered for table {}.{}, returning empty work unit
list",
+ state.getProp(ICEBERG_DATABASE_NAME),
state.getProp(ICEBERG_TABLE_NAME));
+ return workUnits;
+ }
+
+ String nameSpace =
state.getProp(ConfigurationKeys.EXTRACT_NAMESPACE_NAME_KEY, "iceberg");
+ String tableName = table.getTableId().name();
+ Extract extract = new Extract(Extract.TableType.SNAPSHOT_ONLY, nameSpace,
tableName);
+
+ int filesPerWorkUnit = state.getPropAsInt(ICEBERG_FILES_PER_WORKUNIT,
DEFAULT_FILES_PER_WORKUNIT);
+ List<List<IcebergTable.FilePathWithPartition>> groups =
Lists.partition(filesWithPartitions, Math.max(1, filesPerWorkUnit));
+ log.info("Grouping {} files into {} work units ({} files per work unit)",
+ filesWithPartitions.size(), groups.size(), filesPerWorkUnit);
+
+ for (int i = 0; i < groups.size(); i++) {
+ List<IcebergTable.FilePathWithPartition> group = groups.get(i);
+ WorkUnit workUnit = new WorkUnit(extract);
+
+ // Store data file paths and their partition metadata separately
+ // Note: Only data files (parquet/orc/avro) are included, no Iceberg
metadata files
+ List<String> filePaths = Lists.newArrayList();
+ Map<String, String> fileToPartitionPath = Maps.newHashMap();
+ long totalSize = 0L;
+
+ for (IcebergTable.FilePathWithPartition fileWithPartition : group) {
+ String filePath = fileWithPartition.getFilePath();
+ filePaths.add(filePath);
+ // Store partition path for each file
+ fileToPartitionPath.put(filePath,
fileWithPartition.getPartitionPath());
+ // Accumulate file sizes for work unit weight
+ totalSize += fileWithPartition.getFileSize();
+ }
+
+ workUnit.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
String.join(",", filePaths));
+
+ // Store partition path mapping as JSON for extractor to use
+ workUnit.setProp(ICEBERG_FILE_PARTITION_PATH, new
com.google.gson.Gson().toJson(fileToPartitionPath));
+
+ // Set work unit size for dynamic scaling (instead of just file count)
+ workUnit.setProp(ServiceConfigKeys.WORK_UNIT_SIZE, totalSize);
+
+ // Set work unit weight for bin packing
+ setWorkUnitWeight(workUnit, totalSize);
+
+ // Carry partition info to extractor for destination path mapping
+ if (state.contains(ICEBERG_PARTITION_KEY)) {
+ workUnit.setProp(ICEBERG_PARTITION_KEY,
state.getProp(ICEBERG_PARTITION_KEY));
+ }
+ if (state.contains(ICEBERG_PARTITION_VALUES)) {
+ workUnit.setProp(ICEBERG_PARTITION_VALUES,
state.getProp(ICEBERG_PARTITION_VALUES));
+ }
+
+ // Add lineage information for data governance and tracking
+ addLineageSourceInfo(state, workUnit, table);
+ workUnits.add(workUnit);
+
+ log.info("Created work unit {} with {} files, total size: {} bytes", i,
group.size(), totalSize);
+ }
+
+ return workUnits;
+ }
+
+ /**
+ * Create catalog using existing IcebergDatasetFinder logic
+ */
+ private IcebergCatalog createCatalog(SourceState state) throws IOException {
+ String catalogPrefix = "iceberg.catalog.";
+ Map<String, String> catalogProperties =
buildMapFromPrefixChildren(state.getProperties(), catalogPrefix);
+
+ Configuration configuration =
HadoopUtils.getConfFromProperties(state.getProperties());
+ String catalogClassName = catalogProperties.getOrDefault("class",
DEFAULT_ICEBERG_CATALOG_CLASS);
+
+ return IcebergCatalogFactory.create(catalogClassName, catalogProperties,
configuration);
+ }
+
+ /**
+ * Build map of properties with given prefix
+ *
+ */
+ private Map<String, String> buildMapFromPrefixChildren(Properties
properties, String configPrefix) {
+ Map<String, String> catalogProperties = Maps.newHashMap();
+
+ for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+ String key = (String) entry.getKey();
+ if (key.startsWith(configPrefix)) {
+ String relativeKey = key.substring(configPrefix.length());
+ catalogProperties.put(relativeKey, (String) entry.getValue());
+ }
+ }
+
+ String catalogUri = catalogProperties.get("uri");
+ Preconditions.checkNotNull(catalogUri, "Catalog URI is required");
+
+ return catalogProperties;
+ }
+
+ /**
+ * Add lineage information
+ */
+ private void addLineageSourceInfo(SourceState sourceState, WorkUnit
workUnit, IcebergTable table) {
+ if (this.lineageInfo != null && this.lineageInfo.isPresent()) {
+ String catalogUri = sourceState.getProp(ICEBERG_CATALOG_URI);
+ String database = sourceState.getProp(ICEBERG_DATABASE_NAME);
+ String tableName = sourceState.getProp(ICEBERG_TABLE_NAME);
+
+ DatasetDescriptor source = new DatasetDescriptor(
+ DatasetConstants.PLATFORM_ICEBERG,
+ URI.create(catalogUri),
+ database + "." + tableName
+ );
+
+ source.addMetadata("catalog.uri", catalogUri);
+ source.addMetadata("table.location", getTableLocation(table));
+
+ this.lineageInfo.get().setSource(source, workUnit);
+ }
+ }
+
+ /**
+ * Get table location from Iceberg table metadata
+ * @param table the Iceberg table
+ * @return table location or "unknown" if not available
+ */
+ private String getTableLocation(IcebergTable table) {
+ try {
+ return table.accessTableMetadata().location();
+ } catch (Exception e) {
+ return "unknown";
+ }
+ }
+
+ /**
+ * Validate required configuration properties
+ */
+ private void validateConfiguration(SourceState state) {
+ String database = state.getProp(ICEBERG_DATABASE_NAME);
+ String table = state.getProp(ICEBERG_TABLE_NAME);
+ String catalogUri = state.getProp(ICEBERG_CATALOG_URI);
+
+ if (StringUtils.isBlank(database)) {
+ throw new IllegalArgumentException("iceberg.database.name is required");
+ }
+ if (StringUtils.isBlank(table)) {
+ throw new IllegalArgumentException("iceberg.table.name is required");
+ }
+ if (StringUtils.isBlank(catalogUri)) {
+ throw new IllegalArgumentException("iceberg.catalog.uri is required");
+ }
+ }
+
+ /**
+ * Set work unit weight for bin packing based on total file size.
+ * Ensures a minimum weight to prevent skew in bin packing.
+ *
+ * @param workUnit the work unit to set weight on
+ * @param totalSize total size of files in bytes
+ */
+ private void setWorkUnitWeight(WorkUnit workUnit, long totalSize) {
+ long weight = Math.max(totalSize, 1L);
+ workUnit.setProp(WORK_UNIT_WEIGHT, Long.toString(weight));
+ }
+
+ /**
+ * Apply bin packing to work units if configured.
+ * Groups work units into bins based on size constraints for better resource
utilization.
+ *
+ * @param workUnits initial list of work units
+ * @param state source state containing bin packing configuration
+ * @return packed work units (or original if bin packing not configured)
+ */
+ private List<? extends WorkUnit> applyBinPacking(List<WorkUnit> workUnits,
SourceState state) {
+ long maxSizePerBin =
state.getPropAsLong(CopySource.MAX_SIZE_MULTI_WORKUNITS, 0);
+
+ if (maxSizePerBin <= 0) {
+ log.info("Bin packing disabled (maxSizePerBin={}), returning original
work units", maxSizePerBin);
+ return workUnits;
+ }
+
+ log.info("Applying bin packing with maxSizePerBin={} bytes",
maxSizePerBin);
+
+ List<? extends WorkUnit> packedWorkUnits = new
WorstFitDecreasingBinPacking(maxSizePerBin)
+ .pack(workUnits, this.weighter);
+
+ log.info("Bin packing complete. Initial work units: {}, packed work units:
{}, max size per bin: {} bytes",
+ workUnits.size(), packedWorkUnits.size(), maxSizePerBin);
+
+ return packedWorkUnits;
+ }
+
+ /**
+ * Log simulate mode information - what would be copied without executing.
+ * Provides detailed information about files, partitions, and sizes for
dry-run validation.
+ *
+ * @param workUnits work units that would be executed
+ * @param filesWithPartitions discovered files with partition metadata
+ * @param state source state containing job configuration
+ */
+ private void logSimulateMode(List<WorkUnit> workUnits,
+ List<IcebergTable.FilePathWithPartition>
filesWithPartitions,
+ SourceState state) {
+ String database = state.getProp(ICEBERG_DATABASE_NAME);
+ String table = state.getProp(ICEBERG_TABLE_NAME);
+
+ String separator = StringUtils.repeat("=", 80);
+ String dashSeparator = StringUtils.repeat("-", 80);
+
+ log.info(separator);
+ log.info("SIMULATE MODE: Iceberg Table Copy Plan");
+ log.info(separator);
+ log.info("Source Table: {}.{}", database, table);
+ log.info("Total Files Discovered: {}", filesWithPartitions.size());
+ log.info("Total Work Units: {}", workUnits.size());
+
+ // Calculate total size
+ long totalSize = 0L;
+ Map<String, Long> partitionSizes = Maps.newLinkedHashMap();
+
+ for (IcebergTable.FilePathWithPartition fileWithPartition :
filesWithPartitions) {
+ long fileSize = fileWithPartition.getFileSize();
+ totalSize += fileSize;
+
+ String partitionPath = fileWithPartition.getPartitionPath();
+ if (!partitionPath.isEmpty()) {
+ partitionSizes.put(partitionPath,
partitionSizes.getOrDefault(partitionPath, 0L) + fileSize);
+ }
+ }
+
+ log.info("Total Data Size: {} bytes ({} MB)", totalSize, totalSize / (1024
* 1024));
+
+ if (!partitionSizes.isEmpty()) {
+ log.info(dashSeparator);
+ log.info("Partition Breakdown:");
+ for (Map.Entry<String, Long> entry : partitionSizes.entrySet()) {
+ long sizeInMB = entry.getValue() / (1024 * 1024);
+ log.info(" Partition: {} -> {} bytes ({} MB)", entry.getKey(),
entry.getValue(), sizeInMB);
+ }
+ }
+
+ log.info(dashSeparator);
+ log.info("Work Unit Distribution:");
+ for (int i = 0; i < Math.min(workUnits.size(), 10); i++) {
+ WorkUnit wu = workUnits.get(i);
+ String filesToPull =
wu.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, "");
+ int fileCount = filesToPull.isEmpty() ? 0 :
filesToPull.split(",").length;
+ long wuSize = wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE, 0L);
+ log.info(" WorkUnit[{}]: {} files, {} bytes ({} MB)", i, fileCount,
wuSize, wuSize / (1024 * 1024));
+ }
+
+ if (workUnits.size() > 10) {
+ log.info(" ... and {} more work units", workUnits.size() - 10);
+ }
+
+ log.info(separator);
+ log.info("Simulate mode: No data will be copied. Set
iceberg.simulate=false to execute.");
+ log.info(separator);
+ }
+
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
index 0f6c371e0e..a899f83c8c 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java
@@ -23,6 +23,7 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
@@ -31,18 +32,23 @@ import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
@@ -52,6 +58,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.AllArgsConstructor;
@@ -344,4 +351,122 @@ public class IcebergTable {
}
}
+ /**
+ * Container for file path, partition information, and file size.
+ */
+ public static class FilePathWithPartition {
+ private final String filePath;
+ private final Map<String, String> partitionData;
+ private final long fileSize;
+
+ /**
+ * Creates a file path with partition and size information.
+ *
+ * @param filePath the absolute path to the data file
+ * @param partitionData map of partition key-value pairs (e.g.,
"datepartition" -> "2025-04-01")
+ * @param fileSize the size of the file in bytes
+ */
+ public FilePathWithPartition(String filePath, Map<String, String>
partitionData, long fileSize) {
+ this.filePath = filePath;
+ this.partitionData = partitionData;
+ this.fileSize = fileSize;
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public Map<String, String> getPartitionData() {
+ return partitionData;
+ }
+
+ public long getFileSize() {
+ return fileSize;
+ }
+
+ public String getPartitionPath() {
+ if (partitionData == null || partitionData.isEmpty()) {
+ return "";
+ }
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry<String, String> entry : partitionData.entrySet()) {
+ if (sb.length() > 0) {
+ sb.append("/");
+ }
+ sb.append(entry.getKey()).append("=").append(entry.getValue());
+ }
+ return sb.toString();
+ }
+ }
+
+ /**
+ * Return absolute data file paths for files that match the provided Iceberg
filter expression using TableScan.
+ */
+ public List<String>
getDataFilePathsForFilter(org.apache.iceberg.expressions.Expression
filterExpression) {
+ List<String> result = Lists.newArrayList();
+ org.apache.iceberg.TableScan scan =
this.table.newScan().filter(filterExpression);
+ try (CloseableIterable<org.apache.iceberg.FileScanTask> tasks =
scan.planFiles()) {
+ for (org.apache.iceberg.FileScanTask task : tasks) {
+ result.add(task.file().path().toString());
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to plan files for filter: " +
filterExpression, ioe);
+ }
+ return result;
+ }
+
+ /**
+ * Return file paths with partition information and file size for files
matching the filter expression.
+ * This method extracts partition values and file size from Iceberg metadata
and associates them with file paths.
+ */
+ public List<FilePathWithPartition> getFilePathsWithPartitionsForFilter(
+ Expression filterExpression) {
+ List<FilePathWithPartition> result = Lists.newArrayList();
+ TableScan scan = this.table.newScan().filter(filterExpression);
+ PartitionSpec spec = this.table.spec();
+
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ for (FileScanTask task : tasks) {
+ String filePath = task.file().path().toString();
+ long fileSize = task.file().fileSizeInBytes();
+
+ // Extract partition data from the file's partition information
+ Map<String, String> partitionData = Maps.newLinkedHashMap();
+ if (task.file().partition() != null && !spec.isUnpartitioned()) {
+ StructLike partition = task.file().partition();
+ List<PartitionField> fields = spec.fields();
+
+ for (int i = 0; i < fields.size(); i++) {
+ PartitionField field = fields.get(i);
+ String partitionName = field.name();
+ Object partitionValue = partition.get(i, Object.class);
+ if (partitionValue != null) {
+ partitionData.put(partitionName, partitionValue.toString());
+ }
+ }
+ }
+
+ result.add(new FilePathWithPartition(filePath, partitionData,
fileSize));
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to plan files for filter: " +
filterExpression, ioe);
+ }
+ return result;
+ }
+
+ /**
+ * Return data file paths for files that match any of the specified
partition values for a given partition field.
+ */
+ public List<String> getDataFilePathsForPartitionValues(String
partitionField, List<String> partitionValues) {
+ if (partitionValues == null || partitionValues.isEmpty()) {
+ return Lists.newArrayList();
+ }
+ org.apache.iceberg.expressions.Expression expr = null;
+ for (String val : partitionValues) {
+ org.apache.iceberg.expressions.Expression e =
org.apache.iceberg.expressions.Expressions.equal(partitionField, val);
+ expr = (expr == null) ? e :
org.apache.iceberg.expressions.Expressions.or(expr, e);
+ }
+ return getDataFilePathsForFilter(expr);
+ }
+
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelperTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelperTest.java
new file mode 100644
index 0000000000..30e195449d
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelperTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
+
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException;
+
+/**
+ * Tests for {@link IcebergFileStreamHelper}.
+ */
+public class IcebergFileStreamHelperTest {
+
+ private File tempDir;
+ private File testFile1;
+ private File testFile2;
+ private IcebergFileStreamHelper helper;
+ private State state;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ // Create temp directory and test files
+ tempDir = new File(System.getProperty("java.io.tmpdir"),
"iceberg-helper-test-" + System.currentTimeMillis());
+ tempDir.mkdirs();
+
+ testFile1 = new File(tempDir, "data-file-1.parquet");
+ try (FileOutputStream fos = new FileOutputStream(testFile1)) {
+ fos.write("Test data for file 1".getBytes());
+ }
+
+ testFile2 = new File(tempDir, "data-file-2.parquet");
+ try (FileOutputStream fos = new FileOutputStream(testFile2)) {
+ fos.write("Test data for file 2 with more content".getBytes());
+ }
+
+ // Set up state with file paths
+ Properties properties = new Properties();
+ properties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
+ testFile1.getAbsolutePath() + "," + testFile2.getAbsolutePath());
+ state = new State(properties);
+
+ // Initialize helper
+ helper = new IcebergFileStreamHelper(state);
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ if (helper != null) {
+ helper.close();
+ }
+ // Clean up temp files
+ if (tempDir != null && tempDir.exists()) {
+ deleteDirectory(tempDir);
+ }
+ }
+
+ @Test
+ public void testConnect() throws Exception {
+ helper.connect();
+ // If no exception thrown, connection succeeded
+ Assert.assertTrue(true, "Connection should succeed");
+ }
+
+ @Test
+ public void testListFiles() throws Exception {
+ helper.connect();
+ List<String> files = helper.ls("");
+
+ Assert.assertNotNull(files, "File list should not be null");
+ Assert.assertEquals(files.size(), 2, "Should return 2 files from
configuration");
+ Assert.assertTrue(files.contains(testFile1.getAbsolutePath()),
+ "Should contain first test file");
+ Assert.assertTrue(files.contains(testFile2.getAbsolutePath()),
+ "Should contain second test file");
+ }
+
+ @Test
+ public void testListFilesWithEmptyConfig() throws Exception {
+ State emptyState = new State();
+ emptyState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, "");
+ IcebergFileStreamHelper emptyHelper = new
IcebergFileStreamHelper(emptyState);
+
+ try {
+ emptyHelper.connect();
+ List<String> files = emptyHelper.ls("");
+ Assert.assertTrue(files.isEmpty(), "Should return empty list for empty
configuration");
+ } finally {
+ emptyHelper.close();
+ }
+ }
+
+ @Test
+ public void testGetFileStream() throws Exception {
+ helper.connect();
+
+ // Test getting file stream
+ InputStream is = helper.getFileStream(testFile1.getAbsolutePath());
+ Assert.assertNotNull(is, "File stream should not be null");
+
+ // Verify we can read from stream
+ byte[] buffer = new byte[1024];
+ int bytesRead = is.read(buffer);
+ Assert.assertTrue(bytesRead > 0, "Should be able to read bytes from
stream");
+
+ String content = new String(buffer, 0, bytesRead);
+ Assert.assertEquals(content, "Test data for file 1",
+ "Stream content should match file content");
+
+ is.close();
+ }
+
+ @Test(expectedExceptions = FileBasedHelperException.class)
+ public void testGetFileStreamForNonExistentFile() throws Exception {
+ helper.connect();
+
+ // Test error handling for non-existent file
+ helper.getFileStream("/non/existent/path/file.parquet");
+ }
+
+ @Test
+ public void testGetFileSize() throws Exception {
+ helper.connect();
+
+ // Test getting file size
+ long size1 = helper.getFileSize(testFile1.getAbsolutePath());
+ long size2 = helper.getFileSize(testFile2.getAbsolutePath());
+
+ Assert.assertEquals(size1, "Test data for file 1".getBytes().length,
+ "File size should match actual file size");
+ Assert.assertEquals(size2, "Test data for file 2 with more
content".getBytes().length,
+ "File size should match actual file size");
+ Assert.assertTrue(size2 > size1, "Second file should be larger than
first");
+ }
+
+ @Test(expectedExceptions = FileBasedHelperException.class)
+ public void testGetFileSizeForNonExistentFile() throws Exception {
+ helper.connect();
+
+ // Test error handling for non-existent file
+ helper.getFileSize("/non/existent/path/file.parquet");
+ }
+
+ @Test
+ public void testGetFileMTime() throws Exception {
+ helper.connect();
+
+ // Test getting file modification time
+ long mtime1 = helper.getFileMTime(testFile1.getAbsolutePath());
+ long mtime2 = helper.getFileMTime(testFile2.getAbsolutePath());
+
+ Assert.assertTrue(mtime1 > 0, "Modification time should be positive");
+ Assert.assertTrue(mtime2 > 0, "Modification time should be positive");
+
+ // mtime2 should be >= mtime1 since it was created after (or same
millisecond)
+ Assert.assertTrue(mtime2 >= mtime1,
+ "Second file's mtime should be >= first file's mtime");
+ }
+
+ @Test(expectedExceptions = FileBasedHelperException.class)
+ public void testGetFileMTimeForNonExistentFile() throws Exception {
+ helper.connect();
+
+ // Test error handling for non-existent file
+ helper.getFileMTime("/non/existent/path/file.parquet");
+ }
+
+
+ @Test
+ public void testHadoopConfigurationProperties() throws Exception {
+ // Test that Hadoop configuration properties are properly propagated
+ State stateWithHadoopProps = new State();
+
stateWithHadoopProps.setProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
+ testFile1.getAbsolutePath());
+
+ // Add custom Hadoop properties
+ String testFsProperty = "fs.custom.impl";
+ String testFsValue = "org.example.CustomFileSystem";
+ String testHadoopProperty = "hadoop.custom.setting";
+ String testHadoopValue = "customValue";
+
+ stateWithHadoopProps.setProp(testFsProperty, testFsValue);
+ stateWithHadoopProps.setProp(testHadoopProperty, testHadoopValue);
+ stateWithHadoopProps.setProp("not.hadoop.property", "shouldNotBeInConfig");
+
+ IcebergFileStreamHelper helperWithProps = new
IcebergFileStreamHelper(stateWithHadoopProps);
+
+ try {
+ helperWithProps.connect();
+
+ // Verify the properties were set in the Hadoop Configuration via
reflection
+ java.lang.reflect.Field configField =
IcebergFileStreamHelper.class.getDeclaredField("configuration");
+ configField.setAccessible(true);
+ org.apache.hadoop.conf.Configuration config =
+ (org.apache.hadoop.conf.Configuration)
configField.get(helperWithProps);
+
+ Assert.assertEquals(config.get(testFsProperty), testFsValue,
+ "fs.* property should be propagated to Hadoop configuration");
+ Assert.assertEquals(config.get(testHadoopProperty), testHadoopValue,
+ "hadoop.* property should be propagated to Hadoop configuration");
+ Assert.assertNull(config.get("not.hadoop.property"),
+ "Non-Hadoop properties should not be propagated");
+ } finally {
+ helperWithProps.close();
+ }
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ helper.connect();
+
+ // Open a stream
+ InputStream is = helper.getFileStream(testFile1.getAbsolutePath());
+ Assert.assertNotNull(is);
+ is.close();
+
+ // Close helper
+ helper.close();
+
+ // After close, operations should fail
+ try {
+ helper.getFileStream(testFile1.getAbsolutePath());
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof FileBasedHelperException || e instanceof
IOException,
+ "Should throw appropriate exception after close");
+ }
+ }
+
+
+
+ @Test
+ public void testEmptyFile() throws Exception {
+ // Create empty file
+ File emptyFile = new File(tempDir, "empty.parquet");
+ emptyFile.createNewFile();
+
+ helper.connect();
+
+ // Test edge case: empty file should have size 0
+ long size = helper.getFileSize(emptyFile.getAbsolutePath());
+ Assert.assertEquals(size, 0, "Empty file should have size 0");
+ }
+
+ @Test
+ public void testCrossSchemeFileAccess() throws Exception {
+ helper.connect();
+
+ // Test that getFileSystemForPath correctly handles different schemes
+ // Test 1: Local file path (no scheme)
+ String localPath = testFile1.getAbsolutePath();
+ InputStream localStream = helper.getFileStream(localPath);
+ Assert.assertNotNull(localStream, "Should open stream for local file
path");
+ localStream.close();
+
+ // Test 2: file:// scheme
+ String fileScheme = "file://" + testFile1.getAbsolutePath();
+ InputStream fileStream = helper.getFileStream(fileScheme);
+ Assert.assertNotNull(fileStream, "Should open stream for file:// scheme");
+ fileStream.close();
+ }
+
+ private void deleteDirectory(File directory) {
+ File[] files = directory.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ if (file.isDirectory()) {
+ deleteDirectory(file);
+ } else {
+ file.delete();
+ }
+ }
+ }
+ directory.delete();
+ }
+
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
new file mode 100644
index 0000000000..b4e6bc7eae
--- /dev/null
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java
@@ -0,0 +1,1116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import
org.apache.gobblin.data.management.copy.iceberg.IcebergTable.FilePathWithPartition;
+import org.apache.gobblin.service.ServiceConfigKeys;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.source.workunit.Extract;
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hive.HiveMetastoreTest;
+import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.*;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Unit tests for {@link IcebergSource}.
+ */
+public class IcebergSourceTest {
+
+ @Mock
+ private IcebergCatalog mockCatalog;
+
+ @Mock
+ private IcebergTable mockTable;
+
+ @Mock
+ private IcebergSnapshotInfo mockSnapshot;
+
+ private IcebergSource icebergSource;
+ private SourceState sourceState;
+ private Properties properties;
+
+ private AutoCloseable mocks;
+
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ mocks = MockitoAnnotations.openMocks(this);
+
+ // Initialize IcebergSource
+ this.icebergSource = new IcebergSource();
+
+ // Set up basic properties
+ this.properties = new Properties();
+ properties.setProperty(IcebergSource.ICEBERG_DATABASE_NAME, "test_db");
+ properties.setProperty(IcebergSource.ICEBERG_TABLE_NAME, "test_table");
+ properties.setProperty(IcebergSource.ICEBERG_CATALOG_URI,
"https://<test>.com/api/v1");
+ properties.setProperty(IcebergSource.ICEBERG_CATALOG_CLASS,
"org.apache.gobblin.data.management.copy.iceberg.TestCatalog");
+
+ // Create SourceState
+ this.sourceState = new SourceState(new State(properties));
+ // Set a default top-level broker required by LineageInfo
+ com.typesafe.config.Config emptyConfig =
com.typesafe.config.ConfigFactory.empty();
+ this.sourceState.setBroker(
+ SharedResourcesBrokerFactory.createDefaultTopLevelBroker(
+ emptyConfig,
+ GobblinScopeTypes.GLOBAL.defaultScopeInstance()));
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ if (mocks != null) {
+ mocks.close();
+ }
+ }
+
+ @Test
+ public void testFileStreamingModeWorkUnitCreation() throws Exception {
+ // Set up file streaming mode
+ properties.setProperty(IcebergSource.ICEBERG_RECORD_PROCESSING_ENABLED,
"false");
+
+ // Mock file discovery via snapshot fallback (no filter)
+ List<String> inputPaths = Arrays.asList(
+ "/data/warehouse/test_table/data/file1.parquet",
+ "/data/warehouse/test_table/data/file2.parquet",
+ "/data/warehouse/test_table/data/file3.parquet",
+ "/data/warehouse/test_table/metadata/manifest-list.avro",
+ "/data/warehouse/test_table/metadata/manifest1.avro"
+ );
+ setupMockFileDiscovery(inputPaths);
+
+ // Discover data-only paths via snapshot manifest info
+ List<String> dataFilePaths = inputPaths.stream()
+ .filter(p -> p.endsWith(".parquet") || p.endsWith(".orc") ||
p.endsWith(".avro"))
+ .filter(p -> !p.contains("manifest"))
+ .collect(Collectors.toList());
+
+ // Convert to FilePathWithPartition (no partition info for snapshot-based
discovery)
+ List<FilePathWithPartition> filesWithPartitions =
+ dataFilePaths.stream()
+ .map(path -> new FilePathWithPartition(
+ path, new java.util.HashMap<>(), 0L))
+ .collect(Collectors.toList());
+
+ // Invoke private createWorkUnitsFromFiles via reflection
+ Method m =
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class,
SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource,
filesWithPartitions, sourceState, mockTable);
+
+ // Verify single work unit contains all 3 data files by default
(filesPerWorkUnit default=10)
+ Assert.assertEquals(workUnits.size(), 1, "Should create 1 work unit");
+ WorkUnit wu = workUnits.get(0);
+ String filesToPull =
wu.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL);
+ Assert.assertNotNull(filesToPull);
+ Assert.assertEquals(filesToPull.split(",").length, 3);
+
+ // Verify extract info
+ Assert.assertEquals(wu.getExtract().getNamespace(), "iceberg");
+ Assert.assertEquals(wu.getExtract().getTable(), "test_table");
+ Assert.assertEquals(wu.getExtract().getType(),
Extract.TableType.SNAPSHOT_ONLY);
+ }
+
+ @Test
+ public void testFileStreamingModeExtractorSelection() throws Exception {
+ // Set up file streaming mode
+ WorkUnit dummyWu = WorkUnit.createEmpty();
+ State jobState = new State(properties);
+ WorkUnitState workUnitState = new WorkUnitState(dummyWu, jobState);
+ workUnitState.setProp(IcebergSource.ICEBERG_RECORD_PROCESSING_ENABLED,
"false");
+
+ Extractor<String, FileAwareInputStream> extractor =
icebergSource.getExtractor(workUnitState);
+ // Verify correct extractor type
+ Assert.assertTrue(extractor instanceof IcebergFileStreamExtractor,
+ "File streaming mode should return IcebergFileStreamExtractor");
+ }
+
+ @Test
+ public void testRecordProcessingExtractorThrows() throws Exception {
+ // Set up record processing mode
+ WorkUnit dummyWu = WorkUnit.createEmpty();
+ State jobState = new State(properties);
+ WorkUnitState workUnitState = new WorkUnitState(dummyWu, jobState);
+ workUnitState.setProp(IcebergSource.ICEBERG_RECORD_PROCESSING_ENABLED,
"true");
+
+ try {
+ icebergSource.getExtractor(workUnitState);
+ Assert.fail("Expected UnsupportedOperationException for record
processing mode");
+ } catch (UnsupportedOperationException expected) {
+ // Expected exception
+ }
+ }
+
+ @Test
+ public void testConfigurationValidation() throws Exception {
+ // Test missing database name via direct validateConfiguration method
+ properties.remove(IcebergSource.ICEBERG_DATABASE_NAME);
+ sourceState = new SourceState(new State(properties));
+
+ // Use reflection to call private validateConfiguration method
+ Method m = IcebergSource.class.getDeclaredMethod("validateConfiguration",
SourceState.class);
+ m.setAccessible(true);
+
+ try {
+ m.invoke(icebergSource, sourceState);
+ Assert.fail("Should throw exception for missing database name");
+ } catch (java.lang.reflect.InvocationTargetException e) {
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+
Assert.assertTrue(e.getCause().getMessage().contains("iceberg.database.name is
required"));
+ }
+ }
+
+ @Test
+ public void testFileGrouping() throws Exception {
+ // Test with more files than files per work unit
+ properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "3");
+ sourceState = new SourceState(new State(properties));
+
+ // Mock 6 files to test grouping
+ List<String> dataFilePaths = Arrays.asList(
+ "file1.parquet", "file2.parquet", "file3.parquet",
+ "file4.parquet", "file5.parquet", "file6.parquet"
+ );
+
+ // Convert to FilePathWithPartition
+ List<FilePathWithPartition> filesWithPartitions =
+ dataFilePaths.stream()
+ .map(path -> new FilePathWithPartition(
+ path, new java.util.HashMap<>(), 0L))
+ .collect(Collectors.toList());
+
+ // Setup table mock
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
+ // Use reflection to call createWorkUnitsFromFiles directly on data files
+ Method m =
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class,
SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource,
filesWithPartitions, sourceState, mockTable);
+
+ // Should create 2 work units: [3 files], [3 files]
+ Assert.assertEquals(workUnits.size(), 2, "Should create 2 work units for 6
files with files.per.workunit=3");
+
+ // Verify file distribution
+ int totalFiles = 0;
+ for (WorkUnit workUnit : workUnits) {
+ String filesToPull =
workUnit.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL);
+ int filesInThisUnit = filesToPull.split(",").length;
+ totalFiles += filesInThisUnit;
+ Assert.assertTrue(filesInThisUnit <= 3, "No work unit should have more
than 3 files");
+ }
+ Assert.assertEquals(totalFiles, 6, "Total files across all work units
should be 6");
+ }
+
+ /**
+ * Helper method to set up mock file discovery
+ */
+ private void setupMockFileDiscovery(List<String> filePaths) throws Exception
{
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+ when(mockCatalog.openTable("test_db", "test_table")).thenReturn(mockTable);
+ when(mockCatalog.tableAlreadyExists(mockTable)).thenReturn(true);
+ when(mockTable.getCurrentSnapshotInfo()).thenReturn(mockSnapshot);
+
+ // Set up snapshot to return the specified file paths
+ when(mockSnapshot.getManifestListPath()).thenReturn("manifest-list.avro");
+ when(mockSnapshot.getMetadataPath()).thenReturn(Optional.empty());
+
+ // Create manifest info with data files
+ List<String> dataFiles = filePaths.stream()
+ .filter(path -> path.endsWith(".parquet") || path.endsWith(".orc") ||
path.endsWith(".avro"))
+ .filter(path -> !path.contains("manifest"))
+ .collect(Collectors.toList());
+
+ IcebergSnapshotInfo.ManifestFileInfo manifestInfo = new
IcebergSnapshotInfo.ManifestFileInfo(
+ "manifest1.avro", dataFiles);
+
when(mockSnapshot.getManifestFiles()).thenReturn(Arrays.asList(manifestInfo));
+ }
+
+ @Test
+ public void testLookbackPeriodLogic() throws Exception {
+ // Test that lookback period correctly discovers multiple days of
partitions
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-03");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3");
+ sourceState = new SourceState(new State(properties));
+
+ // Mock 3 days of partitions
+ List<FilePathWithPartition> filesFor3Days = Arrays.asList(
+ new FilePathWithPartition(
+ "/data/file1.parquet", createPartitionMap("datepartition",
"2025-04-03"), 1000L),
+ new FilePathWithPartition(
+ "/data/file2.parquet", createPartitionMap("datepartition",
"2025-04-02"), 1000L),
+ new FilePathWithPartition(
+ "/data/file3.parquet", createPartitionMap("datepartition",
"2025-04-01"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+ when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+ .thenReturn(filesFor3Days);
+
+ // Test discovery
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<FilePathWithPartition> discovered =
+ (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState,
mockTable);
+
+ // Verify all 3 days discovered
+ Assert.assertEquals(discovered.size(), 3, "Should discover 3 days with
lookback=3");
+
+ // Verify partition values are set correctly
+ String partitionValues =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+ Assert.assertNotNull(partitionValues, "Partition values should be set");
+ // Should contain 3 dates: 2025-04-03, 2025-04-02, 2025-04-01
+ String[] dates = partitionValues.split(",");
+ Assert.assertEquals(dates.length, 3, "Should have 3 partition values");
+ }
+
+ @Test
+ public void testCurrentDatePlaceholder() throws Exception {
+ // Test that CURRENT_DATE placeholder is resolved to current date with
hourly suffix (default)
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "CURRENT_DATE");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+ sourceState = new SourceState(new State(properties));
+
+ // Mock today's partition with hourly format
+ String today = java.time.LocalDate.now().toString();
+ String todayHourly = today + "-00";
+ List<FilePathWithPartition> todayFiles = Arrays.asList(
+ new FilePathWithPartition(
+ "/data/file1.parquet", createPartitionMap("datepartition",
todayHourly), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+ when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+ .thenReturn(todayFiles);
+
+ // Test discovery
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<FilePathWithPartition> discovered =
+ (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState,
mockTable);
+
+ // Verify today's partition is discovered
+ Assert.assertEquals(discovered.size(), 1, "Should discover today's
partition");
+
+ // Verify partition value has hourly format (default behavior)
+ String partitionValues =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+ Assert.assertNotNull(partitionValues, "Partition values should be set");
+ Assert.assertEquals(partitionValues, todayHourly, "Should resolve to
today's date with -00 suffix");
+ }
+
+ @Test
+ public void testInvalidDateFormatFails() throws Exception {
+ // Test that invalid date format throws proper exception
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "invalid-date");
// Invalid date format
+ sourceState = new SourceState(new State(properties));
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+
+ try {
+ m.invoke(icebergSource, sourceState, mockTable);
+ Assert.fail("Should throw exception for invalid date format");
+ } catch (java.lang.reflect.InvocationTargetException e) {
+ // Unwrap the exception from reflection
+ // Should get IllegalArgumentException wrapping the
DateTimeParseException
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException,
+ "Should throw IllegalArgumentException for invalid date format");
+ Assert.assertTrue(e.getCause().getMessage().contains("Invalid date
format"),
+ "Error message should indicate invalid date format");
+ Assert.assertTrue(e.getCause().getMessage().contains("yyyy-MM-dd"),
+ "Error message should indicate expected format");
+ // Verify the cause is DateTimeParseException
+ Assert.assertTrue(e.getCause().getCause() instanceof
java.time.format.DateTimeParseException,
+ "Root cause should be DateTimeParseException");
+ }
+ }
+
+ @Test
+ public void testPartitionFilterConfiguration() throws Exception {
+ // Test with partition filtering enabled
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3");
+ sourceState = new SourceState(new State(properties));
+
+ // Mock partition-aware file discovery with FilePathWithPartition
+ List<FilePathWithPartition> partitionFiles = Arrays.asList(
+ new FilePathWithPartition(
+ "/data/uuid1.parquet", createPartitionMap("datepartition",
"2025-04-01"), 0L),
+ new FilePathWithPartition(
+ "/data/uuid2.parquet", createPartitionMap("datepartition",
"2025-04-01"), 0L),
+ new FilePathWithPartition(
+ "/data/uuid3.parquet", createPartitionMap("datepartition",
"2025-03-31"), 0L),
+ new FilePathWithPartition(
+ "/data/uuid4.parquet", createPartitionMap("datepartition",
"2025-03-30"), 0L)
+ );
+
+ // Mock the table to return partition-specific files with metadata
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+ when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+ .thenReturn(partitionFiles);
+
+ // Use reflection to test discoverPartitionFilePaths
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<FilePathWithPartition> discoveredFiles =
+ (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState,
mockTable);
+
+ // Verify partition filter was applied
+ Assert.assertEquals(discoveredFiles.size(), 4, "Should discover files from
filtered partitions");
+ }
+
+ @Test
+ public void testPartitionInfoPropagation() throws Exception {
+ // Test that partition info is propagated to work units
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> filesWithPartitions = Arrays.asList(
+ new FilePathWithPartition(
+ "/data/uuid1.parquet", createPartitionMap("datepartition",
"2025-04-01"), 0L),
+ new FilePathWithPartition(
+ "/data/uuid2.parquet", createPartitionMap("datepartition",
"2025-04-01"), 0L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
+ // Set partition info on source state (simulating
discoverPartitionFilePaths behavior)
+ sourceState.setProp(IcebergSource.ICEBERG_PARTITION_KEY, "datepartition");
+ sourceState.setProp(IcebergSource.ICEBERG_PARTITION_VALUES, "2025-04-01");
+
+ // Invoke createWorkUnitsFromFiles
+ Method m =
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class,
SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource,
filesWithPartitions, sourceState, mockTable);
+
+ // Verify partition info is in work unit
+ Assert.assertEquals(workUnits.size(), 1);
+ WorkUnit wu = workUnits.get(0);
+ Assert.assertEquals(wu.getProp(IcebergSource.ICEBERG_PARTITION_KEY),
"datepartition");
+ Assert.assertEquals(wu.getProp(IcebergSource.ICEBERG_PARTITION_VALUES),
"2025-04-01");
+
+ // Verify partition path mapping is stored
+ Assert.assertNotNull(wu.getProp(IcebergSource.ICEBERG_FILE_PARTITION_PATH),
+ "Partition path mapping should be stored in work unit");
+ }
+
+ @Test
+ public void testNoFilterPreservesPartitionMetadata() throws Exception {
+ // Test that when filter is disabled, partition metadata is still
preserved for partitioned tables
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "false");
+ sourceState = new SourceState(new State(properties));
+
+ // Mock all files from a partitioned table with partition metadata
+ List<FilePathWithPartition> allFilesWithPartitions = Arrays.asList(
+ new FilePathWithPartition(
+ "/warehouse/db/table/datepartition=2025-04-01/file1.parquet",
+ createPartitionMap("datepartition", "2025-04-01"), 1000L),
+ new FilePathWithPartition(
+ "/warehouse/db/table/datepartition=2025-04-01/file2.parquet",
+ createPartitionMap("datepartition", "2025-04-01"), 1500L),
+ new FilePathWithPartition(
+ "/warehouse/db/table/datepartition=2025-04-02/file3.parquet",
+ createPartitionMap("datepartition", "2025-04-02"), 2000L),
+ new FilePathWithPartition(
+ "/warehouse/db/table/datepartition=2025-04-03/file4.parquet",
+ createPartitionMap("datepartition", "2025-04-03"), 2500L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+ // Mock TableScan with alwaysTrue() filter to return all files with
partition metadata
+ when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+ .thenReturn(allFilesWithPartitions);
+
+ // Use reflection to test discoverPartitionFilePaths
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<FilePathWithPartition> discoveredFiles =
(List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, mockTable);
+
+ // Verify all files discovered with partition metadata preserved
+ Assert.assertEquals(discoveredFiles.size(), 4, "Should discover all data
files");
+
+ // Verify partition metadata is preserved for each file
+ for (FilePathWithPartition file : discoveredFiles) {
+ Assert.assertNotNull(file.getPartitionData(), "Partition data should be
present");
+ Assert.assertFalse(file.getPartitionData().isEmpty(), "Partition data
should not be empty");
+ Assert.assertTrue(file.getPartitionData().containsKey("datepartition"),
+ "Should have datepartition key");
+ Assert.assertFalse(file.getPartitionPath().isEmpty(),
+ "Partition path should not be empty");
+ Assert.assertTrue(file.getPartitionPath().startsWith("datepartition="),
+ "Partition path should be in format: datepartition=<date>");
+ }
+
+ // Verify files are from different partitions
+ java.util.Set<String> uniquePartitions = discoveredFiles.stream()
+ .map(f -> f.getPartitionData().get("datepartition"))
+ .collect(java.util.stream.Collectors.toSet());
+ Assert.assertEquals(uniquePartitions.size(), 3, "Should have files from 3
different partitions");
+ }
+
+ @Test
+ public void testEmptyFileList() throws Exception {
+ // Test handling of empty file list
+ List<FilePathWithPartition> emptyList = Arrays.asList();
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class,
SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource,
emptyList, sourceState, mockTable);
+
+ // Should return empty list
+ Assert.assertTrue(workUnits.isEmpty(), "Should return empty work unit list
for empty file list");
+ }
+
+ @Test
+ public void testSingleFilePerWorkUnit() throws Exception {
+ // Test with files per work unit = 1
+ properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "1");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> filesWithPartitions = Arrays.asList(
+ new FilePathWithPartition(
+ "file1.parquet", new java.util.HashMap<>(), 0L),
+ new FilePathWithPartition(
+ "file2.parquet", new java.util.HashMap<>(), 0L),
+ new FilePathWithPartition(
+ "file3.parquet", new java.util.HashMap<>(), 0L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
+ Method m =
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class,
SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource,
filesWithPartitions, sourceState, mockTable);
+
+ // Should create 3 work units, one per file
+ Assert.assertEquals(workUnits.size(), 3, "Should create one work unit per
file");
+
+ for (WorkUnit wu : workUnits) {
+ String filesToPull =
wu.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL);
+ Assert.assertEquals(filesToPull.split(",").length, 1, "Each work unit
should have exactly 1 file");
+ }
+ }
+
+ @Test
+ public void testFilterEnabledWithoutDate() throws Exception {
+ // Test that enabling filter without date value throws exception
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.remove(IcebergSource.ICEBERG_FILTER_DATE);
+ sourceState = new SourceState(new State(properties));
+
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+
+ try {
+ m.invoke(icebergSource, sourceState, mockTable);
+ Assert.fail("Expected IllegalArgumentException for missing filter date");
+ } catch (java.lang.reflect.InvocationTargetException e) {
+ // Unwrap the exception from reflection
+ Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
+
Assert.assertTrue(e.getCause().getMessage().contains("iceberg.filter.date is
required"));
+ }
+ }
+
+ /**
+ * Integration test class that creates real Iceberg tables with multiple
partitions
+ * and tests partition-specific data file fetching.
+ */
+ public static class IcebergSourcePartitionIntegrationTest extends
HiveMetastoreTest {
+
+ private static final String TEST_DB_NAME = "test_partition_db";
+ private TableIdentifier partitionedTableId;
+ private Table partitionedTable;
+ private IcebergTable icebergTable;
+
+ // Schema with partition field 'datepartition'
+ private static final org.apache.iceberg.shaded.org.apache.avro.Schema
partitionedAvroSchema =
+ SchemaBuilder.record("partitioned_test")
+ .fields()
+ .name("id").type().longType().noDefault()
+ .name("datepartition").type().stringType().noDefault()
+ .endRecord();
+ private static final Schema partitionedIcebergSchema =
+ AvroSchemaUtil.toIceberg(partitionedAvroSchema);
+ private static final PartitionSpec partitionSpec =
+ PartitionSpec.builderFor(partitionedIcebergSchema)
+ .identity("datepartition")
+ .build();
+
+ @BeforeClass
+ public void setUp() throws Exception {
+ // Start metastore and create test namespace
+ try {
+ startMetastore();
+ } catch (Exception e) {
+ // Metastore may already be started if another test class ran first
+ // The startMetastore() method creates a default 'hivedb' which will
fail if already exists
+ }
+ catalog.createNamespace(Namespace.of(TEST_DB_NAME));
+ }
+
+ @BeforeMethod
+ public void setUpEachTest() {
+ // Create a partitioned table for each test
+ partitionedTableId = TableIdentifier.of(TEST_DB_NAME,
"partitioned_table");
+ partitionedTable = catalog.createTable(partitionedTableId,
partitionedIcebergSchema, partitionSpec,
+ java.util.Collections.singletonMap("format-version", "2"));
+
+ // Add data files for multiple partitions
+ addDataFilesForPartition("2025-04-01", java.util.Arrays.asList(
+
"/data/warehouse/test_db/partitioned_table/datepartition=2025-04-01/file1.parquet",
+
"/data/warehouse/test_db/partitioned_table/datepartition=2025-04-01/file2.parquet"
+ ));
+ addDataFilesForPartition("2025-04-02", java.util.Arrays.asList(
+
"/data/warehouse/test_db/partitioned_table/datepartition=2025-04-02/file3.parquet"
+ ));
+ addDataFilesForPartition("2025-04-03", java.util.Arrays.asList(
+
"/data/warehouse/test_db/partitioned_table/datepartition=2025-04-03/file4.parquet",
+
"/data/warehouse/test_db/partitioned_table/datepartition=2025-04-03/file5.parquet",
+
"/data/warehouse/test_db/partitioned_table/datepartition=2025-04-03/file6.parquet"
+ ));
+
+ // Create IcebergTable wrapper
+ icebergTable = new IcebergTable(
+ partitionedTableId,
+ catalog.newTableOps(partitionedTableId),
+ catalog.getConf().get(CatalogProperties.URI),
+ catalog.loadTable(partitionedTableId)
+ );
+ }
+
+ @AfterMethod
+ public void cleanUpEachTest() {
+ // Clean up partitioned table
+ if (partitionedTableId != null && catalog != null) {
+ try {
+ catalog.dropTable(partitionedTableId);
+ } catch (Exception e) {
+ // Ignore cleanup errors
+ }
+ }
+ }
+
+ @Test
+ public void testGetDataFilePathsForSinglePartition() throws Exception {
+ // Test fetching data files for a single partition
+ List<String> dt20250402Files =
icebergTable.getDataFilePathsForPartitionValues("datepartition",
+ java.util.Collections.singletonList("2025-04-02"));
+
+ // Should return exactly 1 file for datepartition=2025-04-02
+ Assert.assertEquals(dt20250402Files.size(), 1,
+ "Should return exactly 1 file for partition datepartition=2025-04-02");
+
Assert.assertTrue(dt20250402Files.get(0).contains("datepartition=2025-04-02"),
+ "File path should contain partition value");
+ Assert.assertTrue(dt20250402Files.get(0).contains("file3.parquet"),
+ "File path should be file3.parquet");
+ }
+
+ @Test
+ public void testGetDataFilePathsForMultiplePartitions() throws Exception {
+ // Test fetching data files for multiple partitions (OR filter)
+ List<String> multiPartitionFiles =
icebergTable.getDataFilePathsForPartitionValues("datepartition",
+ java.util.Arrays.asList("2025-04-01", "2025-04-03"));
+
+ // Should return 2 files from datepartition=2025-04-01 and 3 files from
datepartition=2025-04-03
+ Assert.assertEquals(multiPartitionFiles.size(), 5,
+ "Should return 5 files (2 from datepartition=2025-04-01 + 3 from
datepartition=2025-04-03)");
+
+ // Verify files from both partitions are present
+ long dt20250401Count = multiPartitionFiles.stream()
+ .filter(path -> path.contains("datepartition=2025-04-01"))
+ .count();
+ long dt20250403Count = multiPartitionFiles.stream()
+ .filter(path -> path.contains("datepartition=2025-04-03"))
+ .count();
+
+ Assert.assertEquals(dt20250401Count, 2, "Should have 2 files from
datepartition=2025-04-01");
+ Assert.assertEquals(dt20250403Count, 3, "Should have 3 files from
datepartition=2025-04-03");
+
+ // Verify no files from datepartition=2025-04-02
+ boolean hasFilesFromExcludedPartition = multiPartitionFiles.stream()
+ .anyMatch(path -> path.contains("datepartition=2025-04-02"));
+ Assert.assertFalse(hasFilesFromExcludedPartition,
+ "Should not include files from datepartition=2025-04-02");
+ }
+
+ @Test
+ public void testGetDataFilePathsForAllPartitions() throws Exception {
+ // Test fetching all data files across all partitions
+ List<String> allFiles =
icebergTable.getDataFilePathsForPartitionValues("datepartition",
+ java.util.Arrays.asList("2025-04-01", "2025-04-02", "2025-04-03"));
+
+ // Should return all 6 files (2 + 1 + 3)
+ Assert.assertEquals(allFiles.size(), 6,
+ "Should return all 6 files across all partitions");
+
+ // Verify distribution across partitions
+ long dt20250401Count = allFiles.stream().filter(p ->
p.contains("datepartition=2025-04-01")).count();
+ long dt20250402Count = allFiles.stream().filter(p ->
p.contains("datepartition=2025-04-02")).count();
+ long dt20250403Count = allFiles.stream().filter(p ->
p.contains("datepartition=2025-04-03")).count();
+
+ Assert.assertEquals(dt20250401Count, 2);
+ Assert.assertEquals(dt20250402Count, 1);
+ Assert.assertEquals(dt20250403Count, 3);
+ }
+
+ @Test
+ public void testGetDataFilePathsForNonExistentPartition() throws Exception
{
+ // Test fetching data files for a partition that doesn't exist
+ List<String> noFiles =
icebergTable.getDataFilePathsForPartitionValues("datepartition",
+ java.util.Collections.singletonList("2025-12-31"));
+
+ // Should return empty list
+ Assert.assertTrue(noFiles.isEmpty(),
+ "Should return empty list for non-existent partition");
+ }
+
+ /**
+ * Helper method to add data files for a specific partition
+ */
+ private void addDataFilesForPartition(String partitionValue, List<String>
filePaths) {
+ PartitionData partitionData =
+ new PartitionData(partitionSpec.partitionType());
+ partitionData.set(0, partitionValue);
+
+ AppendFiles append = partitionedTable.newAppend();
+ for (String filePath : filePaths) {
+ DataFile dataFile = DataFiles.builder(partitionSpec)
+ .withPath(filePath)
+ .withFileSizeInBytes(100L)
+ .withRecordCount(10L)
+ .withPartition(partitionData)
+ .withFormat(FileFormat.PARQUET)
+ .build();
+ append.appendFile(dataFile);
+ }
+ append.commit();
+ }
+ }
+
+ /**
+ * Helper method to create partition map for testing
+ */
+ private Map<String, String> createPartitionMap(String key, String value) {
+ Map<String, String> partitionMap = new HashMap<>();
+ partitionMap.put(key, value);
+ return partitionMap;
+ }
+
+ @Test
+ public void testWorkUnitSizeTracking() throws Exception {
+ // Test that work units include file size information for dynamic scaling
+ properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "2");
+ sourceState = new SourceState(new State(properties));
+
+ // Create files with different sizes
+ List<FilePathWithPartition> filesWithSizes = Arrays.asList(
+ new FilePathWithPartition(
+ "file1.parquet", new HashMap<>(), 1073741824L), // 1 GB
+ new FilePathWithPartition(
+ "file2.parquet", new HashMap<>(), 536870912L), // 512 MB
+ new FilePathWithPartition(
+ "file3.parquet", new HashMap<>(), 2147483648L), // 2 GB
+ new FilePathWithPartition(
+ "file4.parquet", new HashMap<>(), 268435456L) // 256 MB
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
+ // Invoke createWorkUnitsFromFiles
+ Method m =
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class,
SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource,
filesWithSizes, sourceState, mockTable);
+
+ // Should create 2 work units (4 files / 2 files per unit)
+ Assert.assertEquals(workUnits.size(), 2, "Should create 2 work units");
+
+ // Verify each work unit has WORK_UNIT_SIZE set
+ WorkUnit wu1 = workUnits.get(0);
+ long wu1Size = wu1.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE);
+ Assert.assertEquals(wu1Size, 1073741824L + 536870912L, // 1 GB + 512 MB
+ "WorkUnit 1 should have total size of its files");
+
+ WorkUnit wu2 = workUnits.get(1);
+ long wu2Size = wu2.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE);
+ Assert.assertEquals(wu2Size, 2147483648L + 268435456L, // 2 GB + 256 MB
+ "WorkUnit 2 should have total size of its files");
+
+ // Verify work unit weight is set for bin packing
+ String weight1 = wu1.getProp("iceberg.workUnitWeight");
+ Assert.assertNotNull(weight1, "Work unit weight should be set");
+ Assert.assertEquals(Long.parseLong(weight1), wu1Size, "Weight should equal
total size");
+
+ String weight2 = wu2.getProp("iceberg.workUnitWeight");
+ Assert.assertNotNull(weight2, "Work unit weight should be set");
+ Assert.assertEquals(Long.parseLong(weight2), wu2Size, "Weight should equal
total size");
+ }
+
+ @Test
+ public void testBinPackingDisabled() throws Exception {
+ // Test that bin packing is skipped when not configured
+ properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "1");
+ // Do NOT set binPacking.maxSizePerBin - bin packing should be disabled
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> filesWithSizes = Arrays.asList(
+ new FilePathWithPartition(
+ "file1.parquet", new HashMap<>(), 1000L),
+ new FilePathWithPartition(
+ "file2.parquet", new HashMap<>(), 2000L),
+ new FilePathWithPartition(
+ "file3.parquet", new HashMap<>(), 3000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
+ // Create work units
+ Method createMethod =
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class,
SourceState.class, IcebergTable.class);
+ createMethod.setAccessible(true);
+ List<WorkUnit> initialWorkUnits = (List<WorkUnit>)
createMethod.invoke(icebergSource, filesWithSizes, sourceState, mockTable);
+
+ // Apply bin packing (should return original list)
+ Method binPackMethod =
IcebergSource.class.getDeclaredMethod("applyBinPacking", List.class,
SourceState.class);
+ binPackMethod.setAccessible(true);
+ List<? extends WorkUnit> packedWorkUnits = (List<? extends WorkUnit>)
binPackMethod.invoke(icebergSource, initialWorkUnits, sourceState);
+
+ // Should return same number of work units (no packing applied)
+ Assert.assertEquals(packedWorkUnits.size(), initialWorkUnits.size(),
+ "Bin packing should be disabled, returning original work units");
+ Assert.assertEquals(packedWorkUnits.size(), 3, "Should have 3 unpacked
work units");
+ }
+
+ @Test
+ public void testBinPackingEnabled() throws Exception {
+ // Test that bin packing groups work units by size using
WorstFitDecreasing algorithm
+ properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "1");
+ // Use CopySource bin packing configuration key for consistency
+ properties.setProperty(CopySource.MAX_SIZE_MULTI_WORKUNITS, "5000"); //
5KB max per bin
+ sourceState = new SourceState(new State(properties));
+
+ // Create 6 work units with sizes: 1KB, 1KB, 2KB, 2KB, 3KB, 3KB (total
12KB)
+ // WorstFitDecreasing algorithm packs largest items first:
+ // Expected packing with 5KB limit:
+ // Bin 1: 3KB + 2KB = 5KB
+ // Bin 2: 3KB + 2KB = 5KB
+ // Bin 3: 1KB + 1KB = 2KB
+ // Total: 3 bins
+ List<FilePathWithPartition> filesWithSizes = Arrays.asList(
+ new FilePathWithPartition(
+ "file1.parquet", new HashMap<>(), 1000L),
+ new FilePathWithPartition(
+ "file2.parquet", new HashMap<>(), 1000L),
+ new FilePathWithPartition(
+ "file3.parquet", new HashMap<>(), 2000L),
+ new FilePathWithPartition(
+ "file4.parquet", new HashMap<>(), 2000L),
+ new FilePathWithPartition(
+ "file5.parquet", new HashMap<>(), 3000L),
+ new FilePathWithPartition(
+ "file6.parquet", new HashMap<>(), 3000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
+ // Create initial work units (1 file per work unit)
+ Method createMethod =
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class,
SourceState.class, IcebergTable.class);
+ createMethod.setAccessible(true);
+ List<WorkUnit> initialWorkUnits = (List<WorkUnit>)
createMethod.invoke(icebergSource, filesWithSizes, sourceState, mockTable);
+
+ Assert.assertEquals(initialWorkUnits.size(), 6, "Should create 6 initial
work units");
+
+ // Apply bin packing
+ Method binPackMethod =
IcebergSource.class.getDeclaredMethod("applyBinPacking", List.class,
SourceState.class);
+ binPackMethod.setAccessible(true);
+ List<? extends WorkUnit> packedWorkUnits = (List<? extends WorkUnit>)
binPackMethod.invoke(icebergSource, initialWorkUnits, sourceState);
+
+ // Verify bin packing reduced work unit count
+ Assert.assertTrue(packedWorkUnits.size() < initialWorkUnits.size(),
+ "Bin packing should reduce work unit count from 6 to 3");
+
+ // Verify exact bin count (WorstFitDecreasing packs optimally)
+ Assert.assertEquals(packedWorkUnits.size(), 3,
+ "WorstFitDecreasing should pack 6 files (1KB,1KB,2KB,2KB,3KB,3KB) into
exactly 3 bins with 5KB limit");
+
+ // Note: Individual bin sizes are not directly accessible on MultiWorkUnit
returned by bin packing
+ // Size validation is covered by testWorkUnitSizeTracking() which
validates WORK_UNIT_SIZE
+ // is set correctly on individual work units before bin packing
+ }
+
+ @Test
+ public void testSimulateModeReturnsEmptyList() throws Exception {
+ // Test that simulate mode configuration is respected and would return
empty list
+ properties.setProperty(CopySource.SIMULATE, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-10-21");
+ sourceState = new SourceState(new State(properties));
+
+ // Mock files that would be discovered
+ List<FilePathWithPartition> mockFiles = Arrays.asList(
+ new FilePathWithPartition(
+ "/data/file1.parquet", createPartitionMap("datepartition",
"2025-10-21"), 1000L),
+ new FilePathWithPartition(
+ "/data/file2.parquet", createPartitionMap("datepartition",
"2025-10-21"), 2000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+ when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+ .thenReturn(mockFiles);
+
+ // Test 1: Verify simulate mode is enabled in configuration
+ Assert.assertTrue(sourceState.contains(CopySource.SIMULATE),
+ "Simulate mode configuration should be present");
+ Assert.assertTrue(sourceState.getPropAsBoolean(CopySource.SIMULATE),
+ "Simulate mode should be enabled");
+
+ // Test 2: File discovery should work normally in simulate mode (discovery
happens before simulate check)
+ Method discoverMethod =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ discoverMethod.setAccessible(true);
+ List<FilePathWithPartition> discovered =
+ (List<FilePathWithPartition>)
+ discoverMethod.invoke(icebergSource, sourceState, mockTable);
+
+ Assert.assertEquals(discovered.size(), 2,
+ "In simulate mode, file discovery should work normally (happens before
simulate check)");
+
+ // Test 3: Work units should be created normally (happens before simulate
check)
+ Method createMethod =
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles",
+ List.class, SourceState.class, IcebergTable.class);
+ createMethod.setAccessible(true);
+ List<WorkUnit> workUnitsBeforeSimulateCheck = (List<WorkUnit>)
createMethod.invoke(
+ icebergSource, discovered, sourceState, mockTable);
+
+ Assert.assertFalse(workUnitsBeforeSimulateCheck.isEmpty(),
+ "Work units should be created before simulate mode check");
+ Assert.assertEquals(workUnitsBeforeSimulateCheck.size(), 1,
+ "Should create 1 work unit from 2 files before simulate check");
+
+ // Test 4: Verify logSimulateMode can be called successfully (logs the
plan)
+ Method logMethod = IcebergSource.class.getDeclaredMethod("logSimulateMode",
+ List.class, List.class, SourceState.class);
+ logMethod.setAccessible(true);
+ // Should not throw - just logs the simulate mode plan
+ logMethod.invoke(icebergSource, workUnitsBeforeSimulateCheck, discovered,
sourceState);
+
+ // Test 5: Verify the critical behavior - after simulate check, work units
should NOT be returned
+ // Simulate the conditional logic from getWorkunits()
+ List<WorkUnit> actualReturnedWorkUnits;
+ if (sourceState.contains(CopySource.SIMULATE)
+ && sourceState.getPropAsBoolean(CopySource.SIMULATE)) {
+ // This is what getWorkunits() does in simulate mode
+ actualReturnedWorkUnits = Lists.newArrayList(); // Empty list
+ } else {
+ actualReturnedWorkUnits = workUnitsBeforeSimulateCheck;
+ }
+
+ // Assert: In simulate mode, the returned work units should be EMPTY
+ Assert.assertTrue(actualReturnedWorkUnits.isEmpty(),
+ "Simulate mode: getWorkunits() should return empty list (no execution)");
+ Assert.assertEquals(actualReturnedWorkUnits.size(), 0,
+ "Simulate mode: zero work units should be returned for execution");
+
+ // Verify the work units were created but NOT returned
+ Assert.assertEquals(workUnitsBeforeSimulateCheck.size(), 1,
+ "Work units were created internally but not returned due to simulate
mode");
+ }
+
+ @Test
+ public void testHourlyPartitionDateFormat() throws Exception {
+ // Test that hourly partition format is generated when enabled
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-01");
// Standard date format
+ properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED,
"true"); // Enable hourly format
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "1");
+ sourceState = new SourceState(new State(properties));
+
+ // Mock partition with hourly format
+ List<FilePathWithPartition> files = Arrays.asList(
+ new FilePathWithPartition(
+ "/data/file1.parquet", createPartitionMap("datepartition",
"2025-04-01-00"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+ when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+ .thenReturn(files);
+
+ // Test discovery
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<FilePathWithPartition> discovered =
+ (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState,
mockTable);
+
+ // Verify partition is discovered
+ Assert.assertEquals(discovered.size(), 1, "Should discover partition");
+
+ // Verify partition value has -00 suffix appended
+ String partitionValues =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+ Assert.assertNotNull(partitionValues, "Partition values should be set");
+ Assert.assertEquals(partitionValues, "2025-04-01-00", "Should append -00
suffix for hourly partition");
+ }
+
+ @Test
+ public void testHourlyPartitionDateFormatWithLookback() throws Exception {
+ // Test that hourly partition format works with lookback period
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true");
+ properties.setProperty(IcebergSource.ICEBERG_FILTER_DATE, "2025-04-03");
// Standard date format
+ properties.setProperty(IcebergSource.ICEBERG_HOURLY_PARTITION_ENABLED,
"true"); // Enable hourly format
+ properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3");
+ sourceState = new SourceState(new State(properties));
+
+ // Mock 3 days of partitions with hourly format
+ List<FilePathWithPartition> filesFor3Days = Arrays.asList(
+ new FilePathWithPartition(
+ "/data/file1.parquet", createPartitionMap("datepartition",
"2025-04-03-00"), 1000L),
+ new FilePathWithPartition(
+ "/data/file2.parquet", createPartitionMap("datepartition",
"2025-04-02-00"), 1000L),
+ new FilePathWithPartition(
+ "/data/file3.parquet", createPartitionMap("datepartition",
"2025-04-01-00"), 1000L)
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+ when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class)))
+ .thenReturn(filesFor3Days);
+
+ // Test discovery
+ Method m =
IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths",
+ SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<FilePathWithPartition> discovered =
+ (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState,
mockTable);
+
+ // Verify all 3 days discovered
+ Assert.assertEquals(discovered.size(), 3, "Should discover 3 days with
lookback=3");
+
+ // Verify partition values have -00 suffix appended to all dates
+ String partitionValues =
sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES);
+ Assert.assertNotNull(partitionValues, "Partition values should be set");
+ String[] dates = partitionValues.split(",");
+ Assert.assertEquals(dates.length, 3, "Should have 3 partition values");
+
+ // Verify -00 suffix is appended to all dates
+ Assert.assertEquals(dates[0], "2025-04-03-00", "Should have -00 suffix for
day 0");
+ Assert.assertEquals(dates[1], "2025-04-02-00", "Should have -00 suffix for
day 1");
+ Assert.assertEquals(dates[2], "2025-04-01-00", "Should have -00 suffix for
day 2");
+
+ // Verify all follow the hourly format pattern
+ for (String date : dates) {
+ Assert.assertEquals(date.length(), 13, "Date should be in yyyy-MM-dd-00
format (13 chars)");
+ Assert.assertTrue(date.matches("\\d{4}-\\d{2}-\\d{2}-00"), "Date should
match yyyy-MM-dd-00 pattern");
+ }
+ }
+
+ @Test
+ public void testZeroSizeFilesHandling() throws Exception {
+ // Test handling of files with zero or very small sizes
+ properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "3");
+ sourceState = new SourceState(new State(properties));
+
+ List<FilePathWithPartition> filesWithSizes = Arrays.asList(
+ new FilePathWithPartition(
+ "file1.parquet", new HashMap<>(), 0L), // Empty file
+ new FilePathWithPartition(
+ "file2.parquet", new HashMap<>(), 1L), // 1 byte
+ new FilePathWithPartition(
+ "file3.parquet", new HashMap<>(), 100L) // 100 bytes
+ );
+
+ TableIdentifier tableId = TableIdentifier.of("test_db", "test_table");
+ when(mockTable.getTableId()).thenReturn(tableId);
+
+ // Create work units
+ Method m =
IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class,
SourceState.class, IcebergTable.class);
+ m.setAccessible(true);
+ List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource,
filesWithSizes, sourceState, mockTable);
+
+ // Should handle gracefully
+ Assert.assertEquals(workUnits.size(), 1);
+ WorkUnit wu = workUnits.get(0);
+
+ long totalSize = wu.getPropAsLong(ServiceConfigKeys.WORK_UNIT_SIZE);
+ Assert.assertEquals(totalSize, 101L, "Total size should be 0 + 1 + 100 =
101");
+
+ // Weight should be at least 1 (minimum weight)
+ String weightStr = wu.getProp("iceberg.workUnitWeight");
+ long weight = Long.parseLong(weightStr);
+ Assert.assertTrue(weight >= 1L, "Weight should be at least 1 for very
small files");
+ }
+
+}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
index db3bc3257c..f083e7e767 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java
@@ -116,7 +116,12 @@ public class IcebergTableTest extends HiveMetastoreTest {
@BeforeClass
public void setUp() throws Exception {
- startMetastore();
+ try {
+ startMetastore();
+ } catch (Exception e) {
+ // Metastore may already be started if another test class ran first
+ // The startMetastore() method creates a default 'hivedb' which will
fail if already exists
+ }
catalog.createNamespace(Namespace.of(dbName));
}