Blazer-007 commented on code in PR #4146: URL: https://github.com/apache/gobblin/pull/4146#discussion_r2472434545
########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java: ########## @@ -0,0 +1,556 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import com.google.common.base.Optional; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; +import org.apache.gobblin.source.extractor.filebased.FileBasedSource; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.source.workunit.WorkUnitWeighter; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.binpacking.FieldWeighter; +import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +/** + * Unified Iceberg source that supports partition-based data copying from Iceberg tables. + * + * This source reads job configuration, applies date partition filters with optional lookback period, + * and uses Iceberg's TableScan API to enumerate data files for specific partitions. It groups files + * into work units for parallel processing. + * + * <pre> + * # Basic configuration + * source.class=org.apache.gobblin.data.management.copy.iceberg.IcebergSource + * iceberg.database.name=db1 + * iceberg.table.name=table1 + * iceberg.catalog.uri=https://openhouse.com/catalog + * + * # Partition filtering with lookback + * iceberg.filter.enabled=true + * iceberg.filter.expr=datepartition=2025-04-01 + * iceberg.lookback.days=3 Review Comment: With this type of contract users will not be able to schedule flow on GaaS directly since we are expecting partition values as well. We should allow a pattern to be used here instead by creating a new class `DateTimePartitionedIcebergSource extends IcebergSource` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java: ########## @@ -0,0 +1,556 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import com.google.common.base.Optional; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; +import org.apache.gobblin.source.extractor.filebased.FileBasedSource; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.source.workunit.WorkUnitWeighter; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.binpacking.FieldWeighter; +import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +/** + * Unified Iceberg source that supports partition-based data copying from Iceberg tables. + * + * This source reads job configuration, applies date partition filters with optional lookback period, + * and uses Iceberg's TableScan API to enumerate data files for specific partitions. It groups files + * into work units for parallel processing. + * + * <pre> + * # Basic configuration + * source.class=org.apache.gobblin.data.management.copy.iceberg.IcebergSource + * iceberg.database.name=db1 + * iceberg.table.name=table1 + * iceberg.catalog.uri=https://openhouse.com/catalog + * + * # Partition filtering with lookback + * iceberg.filter.enabled=true + * iceberg.filter.expr=datepartition=2025-04-01 + * iceberg.lookback.days=3 + * </pre> + */ +@Slf4j +public class IcebergSource extends FileBasedSource<String, FileAwareInputStream> { + + public static final String ICEBERG_DATABASE_NAME = "iceberg.database.name"; + public static final String ICEBERG_TABLE_NAME = "iceberg.table.name"; + public static final String ICEBERG_CATALOG_URI = "iceberg.catalog.uri"; + public static final String ICEBERG_CATALOG_CLASS = "iceberg.catalog.class"; + public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog"; + public static final String ICEBERG_RECORD_PROCESSING_ENABLED = "iceberg.record.processing.enabled"; + public static final boolean DEFAULT_RECORD_PROCESSING_ENABLED = false; + public static final String ICEBERG_FILES_PER_WORKUNIT = "iceberg.files.per.workunit"; + public static final int DEFAULT_FILES_PER_WORKUNIT = 10; + public static final String ICEBERG_FILTER_ENABLED = "iceberg.filter.enabled"; + public static final String ICEBERG_FILTER_EXPR = "iceberg.filter.expr"; // e.g., datepartition=2025-04-01 + public static final String ICEBERG_LOOKBACK_DAYS = "iceberg.lookback.days"; + public static final int DEFAULT_LOOKBACK_DAYS = 1; + public static final String ICEBERG_DATE_PARTITION_KEY = "datepartition"; // date partition key name + public static final String ICEBERG_PARTITION_KEY = "iceberg.partition.key"; + public static final String ICEBERG_PARTITION_VALUES = "iceberg.partition.values"; + public static final String ICEBERG_FILE_PARTITION_PATH = "iceberg.file.partition.path"; + public static final String ICEBERG_SIMULATE = "iceberg.simulate"; + public static final String ICEBERG_MAX_SIZE_MULTI_WORKUNITS = "iceberg.binPacking.maxSizePerBin"; + public static final String ICEBERG_MAX_WORK_UNITS_PER_BIN = "iceberg.binPacking.maxWorkUnitsPerBin"; + private static final String WORK_UNIT_WEIGHT = "iceberg.workUnitWeight"; + + private Optional<LineageInfo> lineageInfo; + private final WorkUnitWeighter weighter = new FieldWeighter(WORK_UNIT_WEIGHT); + + /** + * Initialize file system helper based on mode (streaming vs record processing) + */ + @Override + public void initFileSystemHelper(State state) throws FileBasedHelperException { + // For file streaming mode, we use IcebergFileStreamHelper + // For record processing mode, we'll use a different helper (future implementation) + boolean recordProcessingEnabled = state.getPropAsBoolean( + ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED); + + if (recordProcessingEnabled) { + // Future: Initialize helper for record processing + throw new UnsupportedOperationException("Record processing mode not yet implemented. " + + "This will be added when SQL/Data Lake destinations are required."); + } else { + // Initialize helper for file streaming - now implements TimestampAwareFileBasedHelper + this.fsHelper = new IcebergFileStreamHelper(state); + this.fsHelper.connect(); + } + } + + /** + * Get work units by discovering files from Iceberg table + * @param state is the source state + * @return List<WorkUnit> list of work units + */ + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker()); + + try { + initFileSystemHelper(state); + + validateConfiguration(state); + + IcebergCatalog catalog = createCatalog(state); + String database = state.getProp(ICEBERG_DATABASE_NAME); + String table = state.getProp(ICEBERG_TABLE_NAME); + + IcebergTable icebergTable = catalog.openTable(database, table); + Preconditions.checkArgument(catalog.tableAlreadyExists(icebergTable), + String.format("OpenHouse table not found: %s.%s", database, table)); + + List<IcebergTable.FilePathWithPartition> filesWithPartitions = discoverPartitionFilePaths(state, icebergTable); + log.info("Discovered {} files from Iceberg table {}.{}", filesWithPartitions.size(), database, table); + + // Create work units from discovered files + List<WorkUnit> workUnits = createWorkUnitsFromFiles(filesWithPartitions, state, icebergTable); + + // Handle simulate mode - log what would be copied without executing + if (state.contains(ICEBERG_SIMULATE) && state.getPropAsBoolean(ICEBERG_SIMULATE)) { + log.info("Simulate mode enabled. Will not execute the copy."); + logSimulateMode(workUnits, filesWithPartitions, state); + return Lists.newArrayList(); + } + + // Apply bin packing to work units if configured + List<? extends WorkUnit> packedWorkUnits = applyBinPacking(workUnits, state); + log.info("Work unit creation complete. Initial work units: {}, packed work units: {}", + workUnits.size(), packedWorkUnits.size()); + + return Lists.newArrayList(packedWorkUnits); + + } catch (Exception e) { + log.error("Failed to create work units for Iceberg table", e); + throw new RuntimeException("Failed to create work units", e); + } + } + + /** + * Get extractor based on mode (streaming vs record processing) + * + * @param state a {@link org.apache.gobblin.configuration.WorkUnitState} carrying properties needed by the returned {@link Extractor} + * @return + * @throws IOException + */ + @Override + public Extractor<String, FileAwareInputStream> getExtractor(WorkUnitState state) throws IOException { + boolean recordProcessingEnabled = state.getPropAsBoolean( + ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED); + + if (recordProcessingEnabled) { + // Return record processing extractor + throw new UnsupportedOperationException("Record processing mode not yet implemented."); + } else { + // Return file streaming extractor + return new IcebergFileStreamExtractor(state); + } + } + + /** + * Discover partition data files using Iceberg TableScan API with optional lookback for date partitions. + * + * <p>This method supports two modes: + * <ol> + * <li><b>Full table scan</b>: When {@code iceberg.filter.enabled=false}, returns all data files from current snapshot</li> + * <li><b>Partition filter</b>: When {@code iceberg.filter.enabled=true}, uses Iceberg TableScan with partition + * filter and applies lookback period for date partitions</li> + * </ol> + * + * <p>For date partitions (partition key = {@value #ICEBERG_DATE_PARTITION_KEY}), the lookback period allows copying data for the last N days. + * For example, with {@code iceberg.filter.expr=datepartition=2025-04-03} and {@code iceberg.lookback.days=3}, + * this will discover files for partitions: datepartition=2025-04-03, datepartition=2025-04-02, datepartition=2025-04-01 + * + * @param state source state containing filter configuration + * @param icebergTable the Iceberg table to scan + * @return list of file paths with partition metadata matching the filter criteria + * @throws IOException if table scan or file discovery fails + */ + private List<IcebergTable.FilePathWithPartition> discoverPartitionFilePaths(SourceState state, IcebergTable icebergTable) throws IOException { + boolean filterEnabled = state.getPropAsBoolean(ICEBERG_FILTER_ENABLED, true); + + if (!filterEnabled) { + log.info("Partition filter disabled, discovering all data files from current snapshot"); + IcebergSnapshotInfo snapshot = icebergTable.getCurrentSnapshotInfo(); + List<IcebergTable.FilePathWithPartition> result = Lists.newArrayList(); + for (IcebergSnapshotInfo.ManifestFileInfo mfi : snapshot.getManifestFiles()) { + for (String filePath : mfi.getListedFilePaths()) { + result.add(new IcebergTable.FilePathWithPartition(filePath, Maps.newHashMap())); + } + } + log.info("Discovered {} data files from snapshot", result.size()); + return result; + } + + // Parse filter expression + String expr = state.getProp(ICEBERG_FILTER_EXPR); + Preconditions.checkArgument(!StringUtils.isBlank(expr), + "iceberg.filter.expr is required when iceberg.filter.enabled=true"); + String[] parts = expr.split("=", 2); + Preconditions.checkArgument(parts.length == 2, + "Invalid iceberg.filter.expr. Expected key=value, got: %s", expr); + String key = parts[0].trim(); + String value = parts[1].trim(); + + // Apply lookback period for date partitions + // lookbackDays=1 (default) means copy only the specified date + // lookbackDays=3 means copy specified date + 2 previous days (total 3 days) + int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS); + List<String> values = Lists.newArrayList(); + + if (ICEBERG_DATE_PARTITION_KEY.equals(key) && lookbackDays >= 1) { + log.info("Applying lookback period of {} days for date partition: {}", lookbackDays, value); + LocalDate start = LocalDate.parse(value); + for (int i = 0; i < lookbackDays; i++) { + String partitionValue = start.minusDays(i).toString(); + values.add(partitionValue); + log.debug("Including partition: {}={}", ICEBERG_DATE_PARTITION_KEY, partitionValue); + } + } else { + log.error("Partition key is not correct or lookbackDays < 1, skipping lookback. Input: {}={}, expected: {}=<date>", + key, value, ICEBERG_DATE_PARTITION_KEY); + throw new IllegalArgumentException(String.format( + "Only date partition filter with lookback period is supported. Expected partition key: '%s', got: '%s'", + ICEBERG_DATE_PARTITION_KEY, key)); + } + + // Store partition info on state for downstream use (extractor, destination path mapping) + state.setProp(ICEBERG_PARTITION_KEY, key); + state.setProp(ICEBERG_PARTITION_VALUES, String.join(",", values)); + + // Use Iceberg TableScan API to get only data files (parquet/orc/avro) for specified partitions + // TableScan.planFiles() returns DataFiles only - no manifest files or metadata files + log.info("Executing TableScan with filter: {}={}", key, values); + Expression icebergExpr = null; + for (String val : values) { + Expression e = Expressions.equal(key, val); + icebergExpr = (icebergExpr == null) ? e : Expressions.or(icebergExpr, e); + } Review Comment: We should move this logic inside a new separate class `IcebergExpressionGenerator` similar to what done in Hive `public class LookbackPartitionFilterGenerator implements PartitionFilterGenerator`. As currently this will only works for one use case. If needed to extend it to further use case like a very simple copy data of partitionCol with partitionVal1, partitionVal2, ..., partitionValN , it will not be easy to do in current implementation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
