Copilot commented on code in PR #4146: URL: https://github.com/apache/gobblin/pull/4146#discussion_r2476668550
########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java: ########## @@ -0,0 +1,556 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import com.google.common.base.Optional; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; +import org.apache.gobblin.source.extractor.filebased.FileBasedSource; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.source.workunit.WorkUnitWeighter; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.binpacking.FieldWeighter; +import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +/** + * Unified Iceberg source that supports partition-based data copying from Iceberg tables. + * + * This source reads job configuration, applies date partition filters with optional lookback period, + * and uses Iceberg's TableScan API to enumerate data files for specific partitions. It groups files + * into work units for parallel processing. + * + * <pre> + * # Basic configuration + * source.class=org.apache.gobblin.data.management.copy.iceberg.IcebergSource + * iceberg.database.name=db1 + * iceberg.table.name=table1 + * iceberg.catalog.uri=https://openhouse.com/catalog + * + * # Partition filtering with lookback + * iceberg.filter.enabled=true + * iceberg.filter.expr=datepartition=2025-04-01 + * iceberg.lookback.days=3 + * </pre> + */ +@Slf4j +public class IcebergSource extends FileBasedSource<String, FileAwareInputStream> { + + public static final String ICEBERG_DATABASE_NAME = "iceberg.database.name"; + public static final String ICEBERG_TABLE_NAME = "iceberg.table.name"; + public static final String ICEBERG_CATALOG_URI = "iceberg.catalog.uri"; + public static final String ICEBERG_CATALOG_CLASS = "iceberg.catalog.class"; + public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog"; + public static final String ICEBERG_RECORD_PROCESSING_ENABLED = "iceberg.record.processing.enabled"; + public static final boolean DEFAULT_RECORD_PROCESSING_ENABLED = false; + public static final String ICEBERG_FILES_PER_WORKUNIT = "iceberg.files.per.workunit"; + public static final int DEFAULT_FILES_PER_WORKUNIT = 10; + public static final String ICEBERG_FILTER_ENABLED = "iceberg.filter.enabled"; + public static final String ICEBERG_FILTER_EXPR = "iceberg.filter.expr"; // e.g., datepartition=2025-04-01 + public static final String ICEBERG_LOOKBACK_DAYS = "iceberg.lookback.days"; + public static final int DEFAULT_LOOKBACK_DAYS = 1; + public static final String ICEBERG_DATE_PARTITION_KEY = "datepartition"; // date partition key name + public static final String ICEBERG_PARTITION_KEY = "iceberg.partition.key"; + public static final String ICEBERG_PARTITION_VALUES = "iceberg.partition.values"; + public static final String ICEBERG_FILE_PARTITION_PATH = "iceberg.file.partition.path"; + public static final String ICEBERG_SIMULATE = "iceberg.simulate"; + public static final String ICEBERG_MAX_SIZE_MULTI_WORKUNITS = "iceberg.binPacking.maxSizePerBin"; + public static final String ICEBERG_MAX_WORK_UNITS_PER_BIN = "iceberg.binPacking.maxWorkUnitsPerBin"; + private static final String WORK_UNIT_WEIGHT = "iceberg.workUnitWeight"; + + private Optional<LineageInfo> lineageInfo; + private final WorkUnitWeighter weighter = new FieldWeighter(WORK_UNIT_WEIGHT); + + /** + * Initialize file system helper based on mode (streaming vs record processing) + */ + @Override + public void initFileSystemHelper(State state) throws FileBasedHelperException { + // For file streaming mode, we use IcebergFileStreamHelper + // For record processing mode, we'll use a different helper (future implementation) + boolean recordProcessingEnabled = state.getPropAsBoolean( + ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED); + + if (recordProcessingEnabled) { + // Future: Initialize helper for record processing + throw new UnsupportedOperationException("Record processing mode not yet implemented. " + + "This will be added when SQL/Data Lake destinations are required."); + } else { + // Initialize helper for file streaming - now implements TimestampAwareFileBasedHelper + this.fsHelper = new IcebergFileStreamHelper(state); + this.fsHelper.connect(); + } + } + + /** + * Get work units by discovering files from Iceberg table + * @param state is the source state + * @return List<WorkUnit> list of work units + */ + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker()); + + try { + initFileSystemHelper(state); + + validateConfiguration(state); + + IcebergCatalog catalog = createCatalog(state); + String database = state.getProp(ICEBERG_DATABASE_NAME); + String table = state.getProp(ICEBERG_TABLE_NAME); + + IcebergTable icebergTable = catalog.openTable(database, table); + Preconditions.checkArgument(catalog.tableAlreadyExists(icebergTable), + String.format("OpenHouse table not found: %s.%s", database, table)); + + List<IcebergTable.FilePathWithPartition> filesWithPartitions = discoverPartitionFilePaths(state, icebergTable); + log.info("Discovered {} files from Iceberg table {}.{}", filesWithPartitions.size(), database, table); + + // Create work units from discovered files + List<WorkUnit> workUnits = createWorkUnitsFromFiles(filesWithPartitions, state, icebergTable); + + // Handle simulate mode - log what would be copied without executing + if (state.contains(ICEBERG_SIMULATE) && state.getPropAsBoolean(ICEBERG_SIMULATE)) { + log.info("Simulate mode enabled. Will not execute the copy."); + logSimulateMode(workUnits, filesWithPartitions, state); + return Lists.newArrayList(); + } + + // Apply bin packing to work units if configured + List<? extends WorkUnit> packedWorkUnits = applyBinPacking(workUnits, state); + log.info("Work unit creation complete. Initial work units: {}, packed work units: {}", + workUnits.size(), packedWorkUnits.size()); + + return Lists.newArrayList(packedWorkUnits); + + } catch (Exception e) { + log.error("Failed to create work units for Iceberg table", e); + throw new RuntimeException("Failed to create work units", e); + } + } + + /** + * Get extractor based on mode (streaming vs record processing) + * + * @param state a {@link org.apache.gobblin.configuration.WorkUnitState} carrying properties needed by the returned {@link Extractor} + * @return + * @throws IOException + */ + @Override + public Extractor<String, FileAwareInputStream> getExtractor(WorkUnitState state) throws IOException { + boolean recordProcessingEnabled = state.getPropAsBoolean( + ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED); + + if (recordProcessingEnabled) { + // Return record processing extractor + throw new UnsupportedOperationException("Record processing mode not yet implemented."); + } else { + // Return file streaming extractor + return new IcebergFileStreamExtractor(state); + } + } + + /** + * Discover partition data files using Iceberg TableScan API with optional lookback for date partitions. + * + * <p>This method supports two modes: + * <ol> + * <li><b>Full table scan</b>: When {@code iceberg.filter.enabled=false}, returns all data files from current snapshot</li> + * <li><b>Partition filter</b>: When {@code iceberg.filter.enabled=true}, uses Iceberg TableScan with partition + * filter and applies lookback period for date partitions</li> + * </ol> + * + * <p>For date partitions (partition key = {@value #ICEBERG_DATE_PARTITION_KEY}), the lookback period allows copying data for the last N days. + * For example, with {@code iceberg.filter.expr=datepartition=2025-04-03} and {@code iceberg.lookback.days=3}, + * this will discover files for partitions: datepartition=2025-04-03, datepartition=2025-04-02, datepartition=2025-04-01 + * + * @param state source state containing filter configuration + * @param icebergTable the Iceberg table to scan + * @return list of file paths with partition metadata matching the filter criteria + * @throws IOException if table scan or file discovery fails + */ + private List<IcebergTable.FilePathWithPartition> discoverPartitionFilePaths(SourceState state, IcebergTable icebergTable) throws IOException { + boolean filterEnabled = state.getPropAsBoolean(ICEBERG_FILTER_ENABLED, true); + + if (!filterEnabled) { + log.info("Partition filter disabled, discovering all data files from current snapshot"); + IcebergSnapshotInfo snapshot = icebergTable.getCurrentSnapshotInfo(); + List<IcebergTable.FilePathWithPartition> result = Lists.newArrayList(); + for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshot.getManifestFiles()) { + for (String filePath : mfi.getListedFilePaths()) { + result.add(new IcebergTable.FilePathWithPartition(filePath, Maps.newHashMap())); + } + } + log.info("Discovered {} data files from snapshot", result.size()); + return result; + } + + // Parse filter expression + String expr = state.getProp(ICEBERG_FILTER_EXPR); + Preconditions.checkArgument(!StringUtils.isBlank(expr), + "iceberg.filter.expr is required when iceberg.filter.enabled=true"); + String[] parts = expr.split("=", 2); + Preconditions.checkArgument(parts.length == 2, + "Invalid iceberg.filter.expr. Expected key=value, got: %s", expr); + String key = parts[0].trim(); + String value = parts[1].trim(); + + // Apply lookback period for date partitions + // lookbackDays=1 (default) means copy only the specified date + // lookbackDays=3 means copy specified date + 2 previous days (total 3 days) + int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS); + List<String> values = Lists.newArrayList(); + + if (ICEBERG_DATE_PARTITION_KEY.equals(key) && lookbackDays >= 1) { + log.info("Applying lookback period of {} days for date partition: {}", lookbackDays, value); + LocalDate start = LocalDate.parse(value); + for (int i = 0; i < lookbackDays; i++) { + String partitionValue = start.minusDays(i).toString(); + values.add(partitionValue); + log.debug("Including partition: {}={}", ICEBERG_DATE_PARTITION_KEY, partitionValue); + } + } else { + log.error("Partition key is not correct or lookbackDays < 1, skipping lookback. Input: {}={}, expected: {}=<date>", + key, value, ICEBERG_DATE_PARTITION_KEY); Review Comment: The log level 'error' is misleading since the code throws an IllegalArgumentException immediately after. This should be 'log.debug' or removed entirely, as the exception message provides the necessary information. ```suggestion ``` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Collections; +import java.util.Iterator; + +import com.google.common.base.Optional; +import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.source.extractor.filebased.FileBasedExtractor; +import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; +import org.apache.gobblin.util.WriterUtils; + +/** + * Extractor for file streaming mode that creates FileAwareInputStream for each file. + * + * This extractor is used when {@code iceberg.record.processing.enabled=false} to stream + * OpenHouse table files as binary data to destinations like Azure, HDFS</p> + * + * Each "record" is a {@link FileAwareInputStream} representing one file from + * the OpenHouse table. The downstream writer handles streaming the file content. Review Comment: Corrected spelling of 'OpenHouse' to 'Iceberg' for consistency. The class is IcebergFileStreamExtractor and should refer to Iceberg tables, not OpenHouse specifically. ```suggestion * Iceberg table files as binary data to destinations like Azure, HDFS</p> * * Each "record" is a {@link FileAwareInputStream} representing one file from * the Iceberg table. The downstream writer handles streaming the file content. ``` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java: ########## @@ -0,0 +1,556 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import com.google.common.base.Optional; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; +import org.apache.gobblin.source.extractor.filebased.FileBasedSource; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.source.workunit.WorkUnitWeighter; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.binpacking.FieldWeighter; +import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +/** + * Unified Iceberg source that supports partition-based data copying from Iceberg tables. + * + * This source reads job configuration, applies date partition filters with optional lookback period, + * and uses Iceberg's TableScan API to enumerate data files for specific partitions. It groups files + * into work units for parallel processing. + * + * <pre> + * # Basic configuration + * source.class=org.apache.gobblin.data.management.copy.iceberg.IcebergSource + * iceberg.database.name=db1 + * iceberg.table.name=table1 + * iceberg.catalog.uri=https://openhouse.com/catalog + * + * # Partition filtering with lookback + * iceberg.filter.enabled=true + * iceberg.filter.expr=datepartition=2025-04-01 + * iceberg.lookback.days=3 + * </pre> + */ +@Slf4j +public class IcebergSource extends FileBasedSource<String, FileAwareInputStream> { + + public static final String ICEBERG_DATABASE_NAME = "iceberg.database.name"; + public static final String ICEBERG_TABLE_NAME = "iceberg.table.name"; + public static final String ICEBERG_CATALOG_URI = "iceberg.catalog.uri"; + public static final String ICEBERG_CATALOG_CLASS = "iceberg.catalog.class"; + public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog"; + public static final String ICEBERG_RECORD_PROCESSING_ENABLED = "iceberg.record.processing.enabled"; + public static final boolean DEFAULT_RECORD_PROCESSING_ENABLED = false; + public static final String ICEBERG_FILES_PER_WORKUNIT = "iceberg.files.per.workunit"; + public static final int DEFAULT_FILES_PER_WORKUNIT = 10; + public static final String ICEBERG_FILTER_ENABLED = "iceberg.filter.enabled"; + public static final String ICEBERG_FILTER_EXPR = "iceberg.filter.expr"; // e.g., datepartition=2025-04-01 + public static final String ICEBERG_LOOKBACK_DAYS = "iceberg.lookback.days"; + public static final int DEFAULT_LOOKBACK_DAYS = 1; + public static final String ICEBERG_DATE_PARTITION_KEY = "datepartition"; // date partition key name + public static final String ICEBERG_PARTITION_KEY = "iceberg.partition.key"; + public static final String ICEBERG_PARTITION_VALUES = "iceberg.partition.values"; + public static final String ICEBERG_FILE_PARTITION_PATH = "iceberg.file.partition.path"; + public static final String ICEBERG_SIMULATE = "iceberg.simulate"; + public static final String ICEBERG_MAX_SIZE_MULTI_WORKUNITS = "iceberg.binPacking.maxSizePerBin"; + public static final String ICEBERG_MAX_WORK_UNITS_PER_BIN = "iceberg.binPacking.maxWorkUnitsPerBin"; + private static final String WORK_UNIT_WEIGHT = "iceberg.workUnitWeight"; + + private Optional<LineageInfo> lineageInfo; + private final WorkUnitWeighter weighter = new FieldWeighter(WORK_UNIT_WEIGHT); + + /** + * Initialize file system helper based on mode (streaming vs record processing) + */ + @Override + public void initFileSystemHelper(State state) throws FileBasedHelperException { + // For file streaming mode, we use IcebergFileStreamHelper + // For record processing mode, we'll use a different helper (future implementation) + boolean recordProcessingEnabled = state.getPropAsBoolean( + ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED); + + if (recordProcessingEnabled) { + // Future: Initialize helper for record processing + throw new UnsupportedOperationException("Record processing mode not yet implemented. " + + "This will be added when SQL/Data Lake destinations are required."); + } else { + // Initialize helper for file streaming - now implements TimestampAwareFileBasedHelper + this.fsHelper = new IcebergFileStreamHelper(state); + this.fsHelper.connect(); + } + } + + /** + * Get work units by discovering files from Iceberg table + * @param state is the source state + * @return List<WorkUnit> list of work units + */ + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker()); + + try { + initFileSystemHelper(state); + + validateConfiguration(state); + + IcebergCatalog catalog = createCatalog(state); + String database = state.getProp(ICEBERG_DATABASE_NAME); + String table = state.getProp(ICEBERG_TABLE_NAME); + + IcebergTable icebergTable = catalog.openTable(database, table); + Preconditions.checkArgument(catalog.tableAlreadyExists(icebergTable), + String.format("OpenHouse table not found: %s.%s", database, table)); Review Comment: Error message refers to 'OpenHouse table' but should say 'Iceberg table' for consistency and accuracy, as this is a generic Iceberg source implementation. ```suggestion String.format("Iceberg table not found: %s.%s", database, table)); ``` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java: ########## @@ -344,4 +351,119 @@ protected void updateSchema(Schema updatedSchema, boolean onlyValidate) throws T } } + /** + * Container for file path, partition information, and file size. + */ + public static class FilePathWithPartition { + private final String filePath; + private final Map<String, String> partitionData; + private final long fileSize; + + public FilePathWithPartition(String filePath, Map<String, String> partitionData) { + this(filePath, partitionData, 0L); Review Comment: [nitpick] Using 0L as a default file size may cause issues for bin packing and size-based work unit distribution. Consider documenting this limitation or using a sentinel value like -1L to indicate 'size unknown'. ```suggestion * <p> * If the file size is unknown, use -1L as a sentinel value. */ public static class FilePathWithPartition { private final String filePath; private final Map<String, String> partitionData; private final long fileSize; /** * Constructs a FilePathWithPartition with unknown file size. * The file size will be set to -1L to indicate "unknown". */ public FilePathWithPartition(String filePath, Map<String, String> partitionData) { this(filePath, partitionData, -1L); ``` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamExtractor.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.Collections; +import java.util.Iterator; + +import com.google.common.base.Optional; +import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.copy.CopyConfiguration; +import org.apache.gobblin.data.management.copy.CopyableFile; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.source.extractor.filebased.FileBasedExtractor; +import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; +import org.apache.gobblin.util.WriterUtils; + +/** + * Extractor for file streaming mode that creates FileAwareInputStream for each file. + * + * This extractor is used when {@code iceberg.record.processing.enabled=false} to stream + * OpenHouse table files as binary data to destinations like Azure, HDFS</p> Review Comment: Incomplete HTML tag in JavaDoc. The closing `</p>` tag appears without an opening `<p>` tag. Either add `<p>` before 'This extractor' or remove the closing tag. ```suggestion * OpenHouse table files as binary data to destinations like Azure, HDFS ``` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergFileStreamHelper.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Lists; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; +import org.apache.gobblin.source.extractor.filebased.TimestampAwareFileBasedHelper; + +/** + * File-based helper for Iceberg file streaming operations. + * + * This helper supports file streaming mode where OpenHouse table files Review Comment: Corrected reference from 'OpenHouse table files' to 'Iceberg table files' for consistency with the class name and broader applicability. ```suggestion * This helper supports file streaming mode where Iceberg table files ``` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java: ########## @@ -0,0 +1,556 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import com.google.common.base.Optional; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; +import org.apache.gobblin.source.extractor.filebased.FileBasedSource; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.source.workunit.WorkUnitWeighter; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.binpacking.FieldWeighter; +import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +/** + * Unified Iceberg source that supports partition-based data copying from Iceberg tables. + * + * This source reads job configuration, applies date partition filters with optional lookback period, + * and uses Iceberg's TableScan API to enumerate data files for specific partitions. It groups files + * into work units for parallel processing. + * + * <pre> + * # Basic configuration + * source.class=org.apache.gobblin.data.management.copy.iceberg.IcebergSource + * iceberg.database.name=db1 + * iceberg.table.name=table1 + * iceberg.catalog.uri=https://openhouse.com/catalog + * + * # Partition filtering with lookback + * iceberg.filter.enabled=true + * iceberg.filter.expr=datepartition=2025-04-01 + * iceberg.lookback.days=3 + * </pre> + */ +@Slf4j +public class IcebergSource extends FileBasedSource<String, FileAwareInputStream> { + + public static final String ICEBERG_DATABASE_NAME = "iceberg.database.name"; + public static final String ICEBERG_TABLE_NAME = "iceberg.table.name"; + public static final String ICEBERG_CATALOG_URI = "iceberg.catalog.uri"; + public static final String ICEBERG_CATALOG_CLASS = "iceberg.catalog.class"; + public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog"; + public static final String ICEBERG_RECORD_PROCESSING_ENABLED = "iceberg.record.processing.enabled"; + public static final boolean DEFAULT_RECORD_PROCESSING_ENABLED = false; + public static final String ICEBERG_FILES_PER_WORKUNIT = "iceberg.files.per.workunit"; + public static final int DEFAULT_FILES_PER_WORKUNIT = 10; + public static final String ICEBERG_FILTER_ENABLED = "iceberg.filter.enabled"; + public static final String ICEBERG_FILTER_EXPR = "iceberg.filter.expr"; // e.g., datepartition=2025-04-01 + public static final String ICEBERG_LOOKBACK_DAYS = "iceberg.lookback.days"; + public static final int DEFAULT_LOOKBACK_DAYS = 1; + public static final String ICEBERG_DATE_PARTITION_KEY = "datepartition"; // date partition key name + public static final String ICEBERG_PARTITION_KEY = "iceberg.partition.key"; + public static final String ICEBERG_PARTITION_VALUES = "iceberg.partition.values"; + public static final String ICEBERG_FILE_PARTITION_PATH = "iceberg.file.partition.path"; + public static final String ICEBERG_SIMULATE = "iceberg.simulate"; + public static final String ICEBERG_MAX_SIZE_MULTI_WORKUNITS = "iceberg.binPacking.maxSizePerBin"; + public static final String ICEBERG_MAX_WORK_UNITS_PER_BIN = "iceberg.binPacking.maxWorkUnitsPerBin"; + private static final String WORK_UNIT_WEIGHT = "iceberg.workUnitWeight"; + + private Optional<LineageInfo> lineageInfo; + private final WorkUnitWeighter weighter = new FieldWeighter(WORK_UNIT_WEIGHT); + + /** + * Initialize file system helper based on mode (streaming vs record processing) + */ + @Override + public void initFileSystemHelper(State state) throws FileBasedHelperException { + // For file streaming mode, we use IcebergFileStreamHelper + // For record processing mode, we'll use a different helper (future implementation) + boolean recordProcessingEnabled = state.getPropAsBoolean( + ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED); + + if (recordProcessingEnabled) { + // Future: Initialize helper for record processing + throw new UnsupportedOperationException("Record processing mode not yet implemented. " + + "This will be added when SQL/Data Lake destinations are required."); + } else { + // Initialize helper for file streaming - now implements TimestampAwareFileBasedHelper + this.fsHelper = new IcebergFileStreamHelper(state); + this.fsHelper.connect(); + } + } + + /** + * Get work units by discovering files from Iceberg table + * @param state is the source state + * @return List<WorkUnit> list of work units + */ + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker()); + + try { + initFileSystemHelper(state); + + validateConfiguration(state); + + IcebergCatalog catalog = createCatalog(state); + String database = state.getProp(ICEBERG_DATABASE_NAME); + String table = state.getProp(ICEBERG_TABLE_NAME); + + IcebergTable icebergTable = catalog.openTable(database, table); + Preconditions.checkArgument(catalog.tableAlreadyExists(icebergTable), + String.format("OpenHouse table not found: %s.%s", database, table)); Review Comment: The method 'tableAlreadyExists' is called after 'openTable'. If the table doesn't exist, 'openTable' would likely fail before this check. Consider reordering to check existence before attempting to open the table, or remove this redundant check. ```suggestion Preconditions.checkArgument(catalog.tableAlreadyExists(database, table), String.format("OpenHouse table not found: %s.%s", database, table)); IcebergTable icebergTable = catalog.openTable(database, table); ``` ########## gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSourceTest.java: ########## @@ -0,0 +1,1033 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; + +import com.google.common.collect.Lists; + +import org.apache.gobblin.broker.SharedResourcesBrokerFactory; +import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.data.management.copy.iceberg.IcebergTable.FilePathWithPartition; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; + +import java.util.HashMap; +import java.util.Map; + +/** + * Unit tests for {@link IcebergSource}. + */ +public class IcebergSourceTest { + + @Mock + private IcebergCatalog mockCatalog; + + @Mock + private IcebergTable mockTable; + + @Mock + private IcebergSnapshotInfo mockSnapshot; + + private IcebergSource icebergSource; + private SourceState sourceState; + private Properties properties; + + private AutoCloseable mocks; + + + @BeforeMethod + public void setUp() throws Exception { + mocks = MockitoAnnotations.openMocks(this); + + // Initialize IcebergSource + this.icebergSource = new IcebergSource(); + + // Set up basic properties + this.properties = new Properties(); + properties.setProperty(IcebergSource.ICEBERG_DATABASE_NAME, "test_db"); + properties.setProperty(IcebergSource.ICEBERG_TABLE_NAME, "test_table"); + properties.setProperty(IcebergSource.ICEBERG_CATALOG_URI, "https://openhouse.test.com/api/v1"); + properties.setProperty(IcebergSource.ICEBERG_CATALOG_CLASS, "org.apache.gobblin.data.management.copy.iceberg.OpenHouseCatalog"); + + // Create SourceState + this.sourceState = new SourceState(new State(properties)); + // Set a default top-level broker required by LineageInfo + com.typesafe.config.Config emptyConfig = com.typesafe.config.ConfigFactory.empty(); + this.sourceState.setBroker( + SharedResourcesBrokerFactory.createDefaultTopLevelBroker( + emptyConfig, + GobblinScopeTypes.GLOBAL.defaultScopeInstance())); + } + + @AfterMethod + public void tearDown() throws Exception { + if (mocks != null) { + mocks.close(); + } + } + + @Test + public void testFileStreamingModeWorkUnitCreation() throws Exception { + // Set up file streaming mode + properties.setProperty(IcebergSource.ICEBERG_RECORD_PROCESSING_ENABLED, "false"); + + // Mock file discovery via snapshot fallback (no filter) + List<String> inputPaths = Arrays.asList( + "/data/warehouse/test_table/data/file1.parquet", + "/data/warehouse/test_table/data/file2.parquet", + "/data/warehouse/test_table/data/file3.parquet", + "/data/warehouse/test_table/metadata/manifest-list.avro", + "/data/warehouse/test_table/metadata/manifest1.avro" + ); + setupMockFileDiscovery(inputPaths); + + // Discover data-only paths via snapshot manifest info + List<String> dataFilePaths = inputPaths.stream() + .filter(p -> p.endsWith(".parquet") || p.endsWith(".orc") || p.endsWith(".avro")) + .filter(p -> !p.contains("manifest")) + .collect(Collectors.toList()); + + // Convert to FilePathWithPartition (no partition info for snapshot-based discovery) + List<FilePathWithPartition> filesWithPartitions = + dataFilePaths.stream() + .map(path -> new FilePathWithPartition( + path, new java.util.HashMap<>())) + .collect(Collectors.toList()); + + // Invoke private createWorkUnitsFromFiles via reflection + Method m = IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class, SourceState.class, IcebergTable.class); + m.setAccessible(true); + List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource, filesWithPartitions, sourceState, mockTable); + + // Verify single work unit contains all 3 data files by default (filesPerWorkUnit default=10) + Assert.assertEquals(workUnits.size(), 1, "Should create 1 work unit"); + WorkUnit wu = workUnits.get(0); + String filesToPull = wu.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL); + Assert.assertNotNull(filesToPull); + Assert.assertEquals(filesToPull.split(",").length, 3); + + // Verify extract info + Assert.assertEquals(wu.getExtract().getNamespace(), "iceberg"); + Assert.assertEquals(wu.getExtract().getTable(), "test_table"); + Assert.assertEquals(wu.getExtract().getType(), Extract.TableType.SNAPSHOT_ONLY); + } + + @Test + public void testFileStreamingModeExtractorSelection() throws Exception { + // Set up file streaming mode + WorkUnit dummyWu = WorkUnit.createEmpty(); + State jobState = new State(properties); + WorkUnitState workUnitState = new WorkUnitState(dummyWu, jobState); + workUnitState.setProp(IcebergSource.ICEBERG_RECORD_PROCESSING_ENABLED, "false"); + + Extractor<String, FileAwareInputStream> extractor = icebergSource.getExtractor(workUnitState); + // Verify correct extractor type + Assert.assertTrue(extractor instanceof IcebergFileStreamExtractor, + "File streaming mode should return IcebergFileStreamExtractor"); + } + + @Test + public void testRecordProcessingExtractorThrows() throws Exception { + // Set up record processing mode + WorkUnit dummyWu = WorkUnit.createEmpty(); + State jobState = new State(properties); + WorkUnitState workUnitState = new WorkUnitState(dummyWu, jobState); + workUnitState.setProp(IcebergSource.ICEBERG_RECORD_PROCESSING_ENABLED, "true"); + + try { + icebergSource.getExtractor(workUnitState); + Assert.fail("Expected UnsupportedOperationException for record processing mode"); + } catch (UnsupportedOperationException expected) { + // Expected exception + } + } + + @Test + public void testConfigurationValidation() throws Exception { + // Test missing database name via direct validateConfiguration method + properties.remove(IcebergSource.ICEBERG_DATABASE_NAME); + sourceState = new SourceState(new State(properties)); + + // Use reflection to call private validateConfiguration method + Method m = IcebergSource.class.getDeclaredMethod("validateConfiguration", SourceState.class); + m.setAccessible(true); + + try { + m.invoke(icebergSource, sourceState); + Assert.fail("Should throw exception for missing database name"); + } catch (java.lang.reflect.InvocationTargetException e) { + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException); + Assert.assertTrue(e.getCause().getMessage().contains("iceberg.database.name is required")); + } + } + + @Test + public void testFileGrouping() throws Exception { + // Test with more files than files per work unit + properties.setProperty(IcebergSource.ICEBERG_FILES_PER_WORKUNIT, "3"); + sourceState = new SourceState(new State(properties)); + + // Mock 6 files to test grouping + List<String> dataFilePaths = Arrays.asList( + "file1.parquet", "file2.parquet", "file3.parquet", + "file4.parquet", "file5.parquet", "file6.parquet" + ); + + // Convert to FilePathWithPartition + List<FilePathWithPartition> filesWithPartitions = + dataFilePaths.stream() + .map(path -> new FilePathWithPartition( + path, new java.util.HashMap<>())) + .collect(Collectors.toList()); + + // Setup table mock + TableIdentifier tableId = TableIdentifier.of("test_db", "test_table"); + when(mockTable.getTableId()).thenReturn(tableId); + + // Use reflection to call createWorkUnitsFromFiles directly on data files + Method m = IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class, SourceState.class, IcebergTable.class); + m.setAccessible(true); + List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource, filesWithPartitions, sourceState, mockTable); + + // Should create 2 work units: [3 files], [3 files] + Assert.assertEquals(workUnits.size(), 2, "Should create 2 work units for 6 files with files.per.workunit=3"); + + // Verify file distribution + int totalFiles = 0; + for (WorkUnit workUnit : workUnits) { + String filesToPull = workUnit.getProp(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL); + int filesInThisUnit = filesToPull.split(",").length; + totalFiles += filesInThisUnit; + Assert.assertTrue(filesInThisUnit <= 3, "No work unit should have more than 3 files"); + } + Assert.assertEquals(totalFiles, 6, "Total files across all work units should be 6"); + } + + /** + * Helper method to set up mock file discovery + */ + private void setupMockFileDiscovery(List<String> filePaths) throws Exception { + TableIdentifier tableId = TableIdentifier.of("test_db", "test_table"); + when(mockTable.getTableId()).thenReturn(tableId); + when(mockCatalog.openTable("test_db", "test_table")).thenReturn(mockTable); + when(mockCatalog.tableAlreadyExists(mockTable)).thenReturn(true); + when(mockTable.getCurrentSnapshotInfo()).thenReturn(mockSnapshot); + + // Set up snapshot to return the specified file paths + when(mockSnapshot.getManifestListPath()).thenReturn("manifest-list.avro"); + when(mockSnapshot.getMetadataPath()).thenReturn(Optional.empty()); + + // Create manifest info with data files + List<String> dataFiles = filePaths.stream() + .filter(path -> path.endsWith(".parquet") || path.endsWith(".orc") || path.endsWith(".avro")) + .filter(path -> !path.contains("manifest")) + .collect(Collectors.toList()); + + IcebergSnapshotInfo.ManifestFileInfo manifestInfo = new IcebergSnapshotInfo.ManifestFileInfo( + "manifest1.avro", dataFiles); + when(mockSnapshot.getManifestFiles()).thenReturn(Arrays.asList(manifestInfo)); + } + + @Test + public void testLookbackPeriodLogic() throws Exception { + // Test that lookback period correctly discovers multiple days of partitions + properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true"); + properties.setProperty(IcebergSource.ICEBERG_FILTER_EXPR, "datepartition=2025-04-03"); + properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3"); + sourceState = new SourceState(new State(properties)); + + // Mock 3 days of partitions + List<FilePathWithPartition> filesFor3Days = Arrays.asList( + new FilePathWithPartition( + "/data/file1.parquet", createPartitionMap("datepartition", "2025-04-03"), 1000L), + new FilePathWithPartition( + "/data/file2.parquet", createPartitionMap("datepartition", "2025-04-02"), 1000L), + new FilePathWithPartition( + "/data/file3.parquet", createPartitionMap("datepartition", "2025-04-01"), 1000L) + ); + + TableIdentifier tableId = TableIdentifier.of("test_db", "test_table"); + when(mockTable.getTableId()).thenReturn(tableId); + when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))) + .thenReturn(filesFor3Days); + + // Test discovery + Method m = IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths", + SourceState.class, IcebergTable.class); + m.setAccessible(true); + List<FilePathWithPartition> discovered = + (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, mockTable); + + // Verify all 3 days discovered + Assert.assertEquals(discovered.size(), 3, "Should discover 3 days with lookback=3"); + + // Verify partition values are set correctly + String partitionValues = sourceState.getProp(IcebergSource.ICEBERG_PARTITION_VALUES); + Assert.assertNotNull(partitionValues, "Partition values should be set"); + // Should contain 3 dates: 2025-04-03, 2025-04-02, 2025-04-01 + String[] dates = partitionValues.split(","); + Assert.assertEquals(dates.length, 3, "Should have 3 partition values"); + } + + @Test + public void testNonDatePartitionKeyFails() throws Exception { + // Test that non-date partition keys throw proper exception + properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true"); + properties.setProperty(IcebergSource.ICEBERG_FILTER_EXPR, "region=US"); // Non-date partition + sourceState = new SourceState(new State(properties)); + + Method m = IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths", + SourceState.class, IcebergTable.class); + m.setAccessible(true); + + try { + m.invoke(icebergSource, sourceState, mockTable); + Assert.fail("Should throw exception for non-date partition key"); + } catch (java.lang.reflect.InvocationTargetException e) { + // Unwrap the exception from reflection + Assert.assertTrue(e.getCause() instanceof IllegalArgumentException, + "Should throw IllegalArgumentException for non-date partition"); + Assert.assertTrue(e.getCause().getMessage().contains("datepartition"), + "Error message should mention expected partition key 'datepartition'"); + } + } + + @Test + public void testPartitionFilterConfiguration() throws Exception { + // Test with partition filtering enabled + properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true"); + properties.setProperty(IcebergSource.ICEBERG_FILTER_EXPR, "datepartition=2025-04-01"); + properties.setProperty(IcebergSource.ICEBERG_LOOKBACK_DAYS, "3"); + sourceState = new SourceState(new State(properties)); + + // Mock partition-aware file discovery with FilePathWithPartition + List<FilePathWithPartition> partitionFiles = Arrays.asList( + new FilePathWithPartition( + "/data/uuid1.parquet", createPartitionMap("datepartition", "2025-04-01")), + new FilePathWithPartition( + "/data/uuid2.parquet", createPartitionMap("datepartition", "2025-04-01")), + new FilePathWithPartition( + "/data/uuid3.parquet", createPartitionMap("datepartition", "2025-03-31")), + new FilePathWithPartition( + "/data/uuid4.parquet", createPartitionMap("datepartition", "2025-03-30")) + ); + + // Mock the table to return partition-specific files with metadata + TableIdentifier tableId = TableIdentifier.of("test_db", "test_table"); + when(mockTable.getTableId()).thenReturn(tableId); + when(mockTable.getFilePathsWithPartitionsForFilter(any(Expression.class))) + .thenReturn(partitionFiles); + + // Use reflection to test discoverPartitionFilePaths + Method m = IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths", SourceState.class, IcebergTable.class); + m.setAccessible(true); + List<FilePathWithPartition> discoveredFiles = + (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, mockTable); + + // Verify partition filter was applied + Assert.assertEquals(discoveredFiles.size(), 4, "Should discover files from filtered partitions"); + } + + @Test + public void testPartitionInfoPropagation() throws Exception { + // Test that partition info is propagated to work units + properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "true"); + properties.setProperty(IcebergSource.ICEBERG_FILTER_EXPR, "datepartition=2025-04-01"); + sourceState = new SourceState(new State(properties)); + + List<FilePathWithPartition> filesWithPartitions = Arrays.asList( + new FilePathWithPartition( + "/data/uuid1.parquet", createPartitionMap("datepartition", "2025-04-01")), + new FilePathWithPartition( + "/data/uuid2.parquet", createPartitionMap("datepartition", "2025-04-01")) + ); + + TableIdentifier tableId = TableIdentifier.of("test_db", "test_table"); + when(mockTable.getTableId()).thenReturn(tableId); + + // Set partition info on source state (simulating discoverPartitionFilePaths behavior) + sourceState.setProp(IcebergSource.ICEBERG_PARTITION_KEY, "datepartition"); + sourceState.setProp(IcebergSource.ICEBERG_PARTITION_VALUES, "2025-04-01"); + + // Invoke createWorkUnitsFromFiles + Method m = IcebergSource.class.getDeclaredMethod("createWorkUnitsFromFiles", List.class, SourceState.class, IcebergTable.class); + m.setAccessible(true); + List<WorkUnit> workUnits = (List<WorkUnit>) m.invoke(icebergSource, filesWithPartitions, sourceState, mockTable); + + // Verify partition info is in work unit + Assert.assertEquals(workUnits.size(), 1); + WorkUnit wu = workUnits.get(0); + Assert.assertEquals(wu.getProp(IcebergSource.ICEBERG_PARTITION_KEY), "datepartition"); + Assert.assertEquals(wu.getProp(IcebergSource.ICEBERG_PARTITION_VALUES), "2025-04-01"); + + // Verify partition path mapping is stored + Assert.assertNotNull(wu.getProp(IcebergSource.ICEBERG_FILE_PARTITION_PATH), + "Partition path mapping should be stored in work unit"); + } + + @Test + public void testNoFilterFallbackToSnapshot() throws Exception { + // Test that when filter is disabled, it falls back to snapshot-based discovery + properties.setProperty(IcebergSource.ICEBERG_FILTER_ENABLED, "false"); + sourceState = new SourceState(new State(properties)); + + List<String> snapshotFiles = Arrays.asList( + "/data/file1.parquet", + "/data/file2.parquet", + "/data/file3.parquet" + ); + + setupMockFileDiscovery(snapshotFiles); + + // Use reflection to test discoverPartitionFilePaths + Method m = IcebergSource.class.getDeclaredMethod("discoverPartitionFilePaths", SourceState.class, IcebergTable.class); + m.setAccessible(true); + List<String> discoveredFiles = (List<String>) m.invoke(icebergSource, sourceState, mockTable); Review Comment: Type mismatch: discoverPartitionFilePaths returns List<FilePathWithPartition> but this test expects List<String>. This will cause a ClassCastException at runtime. ```suggestion List<FilePathWithPartition> discoveredFileObjs = (List<FilePathWithPartition>) m.invoke(icebergSource, sourceState, mockTable); List<String> discoveredFiles = discoveredFileObjs.stream() .map(fpp -> fpp.filePath) .collect(Collectors.toList()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
