khandelwal-prateek commented on code in PR #4146: URL: https://github.com/apache/gobblin/pull/4146#discussion_r2510757157
########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java: ########## @@ -0,0 +1,595 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; +import java.net.URI; +import java.time.LocalDate; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import com.google.common.base.Optional; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.data.management.copy.CopySource; +import org.apache.gobblin.data.management.copy.FileAwareInputStream; +import org.apache.gobblin.dataset.DatasetConstants; +import org.apache.gobblin.dataset.DatasetDescriptor; +import org.apache.gobblin.metrics.event.lineage.LineageInfo; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.extractor.filebased.FileBasedHelperException; +import org.apache.gobblin.source.extractor.filebased.FileBasedSource; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.source.workunit.WorkUnitWeighter; +import org.apache.gobblin.util.HadoopUtils; +import org.apache.gobblin.util.binpacking.FieldWeighter; +import org.apache.gobblin.util.binpacking.WorstFitDecreasingBinPacking; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +/** + * Unified Iceberg source that supports partition-based data copying from Iceberg tables. + * + * This source reads job configuration, applies date partition filters with optional lookback period, + * and uses Iceberg's TableScan API to enumerate data files for specific partitions. It groups files + * into work units for parallel processing. + * + * <pre> + * # Basic configuration + * source.class=org.apache.gobblin.data.management.copy.iceberg.IcebergSource + * iceberg.database.name=db1 + * iceberg.table.name=table1 + * iceberg.catalog.uri=ICEBERG_CATALOG_URI + * + * # Partition filtering with lookback - Static date (hourly partitions by default) + * iceberg.filter.enabled=true + * iceberg.partition.column=datepartition # Optional, defaults to "datepartition" + * iceberg.filter.date=2025-04-01 # Date value (yyyy-MM-dd format) + * iceberg.lookback.days=3 + * # iceberg.hourly.partition.enabled=true # Optional, defaults to true; generates yyyy-MM-dd-00 format + * + * # Partition filtering with lookback - Scheduled flows (dynamic date) + * iceberg.filter.enabled=true + * iceberg.filter.date=CURRENT_DATE # Uses current date at runtime + * iceberg.lookback.days=3 + * + * # Standard daily partitions (disable hourly suffix if table uses yyyy-MM-dd format) + * iceberg.filter.enabled=true + * iceberg.filter.date=2025-04-01 # Date in yyyy-MM-dd format + * iceberg.hourly.partition.enabled=false # Set false for non-hourly partitions + * iceberg.lookback.days=3 + * + * # Copy all data (no filtering) + * iceberg.filter.enabled=false + * # No filter.date needed - will copy all partitions from current snapshot + * + * # Bin packing for better resource utilization + * gobblin.copy.binPacking.maxSizePerBin=1000000000 # 1GB per bin + * </pre> + */ +@Slf4j +public class IcebergSource extends FileBasedSource<String, FileAwareInputStream> { + + public static final String ICEBERG_DATABASE_NAME = "iceberg.database.name"; + public static final String ICEBERG_TABLE_NAME = "iceberg.table.name"; + public static final String ICEBERG_CATALOG_URI = "iceberg.catalog.uri"; + public static final String ICEBERG_CATALOG_CLASS = "iceberg.catalog.class"; + public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog"; + public static final String ICEBERG_RECORD_PROCESSING_ENABLED = "iceberg.record.processing.enabled"; + public static final boolean DEFAULT_RECORD_PROCESSING_ENABLED = false; + public static final String ICEBERG_FILES_PER_WORKUNIT = "iceberg.files.per.workunit"; + public static final int DEFAULT_FILES_PER_WORKUNIT = 10; + public static final String ICEBERG_FILTER_ENABLED = "iceberg.filter.enabled"; + public static final String ICEBERG_FILTER_DATE = "iceberg.filter.date"; // Date value (e.g., 2025-04-01 or CURRENT_DATE) + public static final String ICEBERG_LOOKBACK_DAYS = "iceberg.lookback.days"; + public static final int DEFAULT_LOOKBACK_DAYS = 1; + public static final String ICEBERG_PARTITION_COLUMN = "iceberg.partition.column"; // configurable partition column name + public static final String DEFAULT_DATE_PARTITION_COLUMN = "datepartition"; // default date partition column name + public static final String CURRENT_DATE_PLACEHOLDER = "CURRENT_DATE"; // placeholder for current date + public static final String ICEBERG_PARTITION_KEY = "iceberg.partition.key"; + public static final String ICEBERG_PARTITION_VALUES = "iceberg.partition.values"; + public static final String ICEBERG_FILE_PARTITION_PATH = "iceberg.file.partition.path"; + public static final String ICEBERG_HOURLY_PARTITION_ENABLED = "iceberg.hourly.partition.enabled"; + public static final boolean DEFAULT_HOURLY_PARTITION_ENABLED = true; + private static final String HOURLY_PARTITION_SUFFIX = "-00"; + private static final String WORK_UNIT_WEIGHT = "iceberg.workUnitWeight"; + + private Optional<LineageInfo> lineageInfo; + private final WorkUnitWeighter weighter = new FieldWeighter(WORK_UNIT_WEIGHT); + + /** + * Initialize file system helper based on mode (streaming vs record processing) + */ + @Override + public void initFileSystemHelper(State state) throws FileBasedHelperException { + // For file streaming mode, we use IcebergFileStreamHelper + // For record processing mode, we'll use a different helper (future implementation) + boolean recordProcessingEnabled = state.getPropAsBoolean( + ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED); + + if (recordProcessingEnabled) { + // Future: Initialize helper for record processing + throw new UnsupportedOperationException("Record processing mode not yet implemented. " + + "This will be added when SQL/Data Lake destinations are required."); + } else { + // Initialize helper for file streaming - now implements TimestampAwareFileBasedHelper + this.fsHelper = new IcebergFileStreamHelper(state); + this.fsHelper.connect(); + } + } + + /** + * Get work units by discovering files from Iceberg table + * @param state is the source state + * @return List<WorkUnit> list of work units + */ + @Override + public List<WorkUnit> getWorkunits(SourceState state) { + this.lineageInfo = LineageInfo.getLineageInfo(state.getBroker()); + + try { + initFileSystemHelper(state); + + validateConfiguration(state); + + IcebergCatalog catalog = createCatalog(state); + String database = state.getProp(ICEBERG_DATABASE_NAME); + String table = state.getProp(ICEBERG_TABLE_NAME); + IcebergTable icebergTable = catalog.openTable(database, table); + + List<IcebergTable.FilePathWithPartition> filesWithPartitions = discoverPartitionFilePaths(state, icebergTable); + log.info("Discovered {} files from Iceberg table {}.{}", filesWithPartitions.size(), database, table); + + // Create work units from discovered files + List<WorkUnit> workUnits = createWorkUnitsFromFiles(filesWithPartitions, state, icebergTable); + + // Handle simulate mode - log what would be copied without executing + if (state.contains(CopySource.SIMULATE) && state.getPropAsBoolean(CopySource.SIMULATE)) { + log.info("Simulate mode enabled. Will not execute the copy."); + logSimulateMode(workUnits, filesWithPartitions, state); + return Lists.newArrayList(); + } + + // Apply bin packing to work units if configured + List<? extends WorkUnit> packedWorkUnits = applyBinPacking(workUnits, state); + log.info("Work unit creation complete. Initial work units: {}, packed work units: {}", + workUnits.size(), packedWorkUnits.size()); + + return Lists.newArrayList(packedWorkUnits); + + } catch (Exception e) { + log.error("Failed to create work units for Iceberg table", e); + throw new RuntimeException("Failed to create work units", e); + } + } + + /** + * Get extractor based on mode (streaming vs record processing) + * + * @param state a {@link org.apache.gobblin.configuration.WorkUnitState} carrying properties needed by the returned {@link Extractor} + * @return + * @throws IOException + */ + @Override + public Extractor<String, FileAwareInputStream> getExtractor(WorkUnitState state) throws IOException { + boolean recordProcessingEnabled = state.getPropAsBoolean( + ICEBERG_RECORD_PROCESSING_ENABLED, DEFAULT_RECORD_PROCESSING_ENABLED); + + if (recordProcessingEnabled) { + // Return record processing extractor + throw new UnsupportedOperationException("Record processing mode not yet implemented."); + } else { + // Return file streaming extractor + return new IcebergFileStreamExtractor(state); + } + } + + /** + * Discover partition data files using Iceberg TableScan API with optional lookback for date partitions. + * + * <p>This method supports three modes: + * <ol> + * <li><b>Full table scan (copy all data)</b>: Set {@code iceberg.filter.enabled=false}. + * Returns all data files from current snapshot. Use this for one-time full copies or backfills.</li> + * <li><b>Static date partition filter</b>: Set {@code iceberg.filter.enabled=true} with a specific date + * (e.g., {@code iceberg.filter.date=2025-04-01}). Use this for ad-hoc historical data copies.</li> + * <li><b>Dynamic date partition filter</b>: Set {@code iceberg.filter.enabled=true} with + * {@code iceberg.filter.date=CURRENT_DATE}. The {@value #CURRENT_DATE_PLACEHOLDER} placeholder + * is resolved to the current date at runtime. Use this for daily scheduled flows.</li> + * </ol> + * + * <p>The partition column name is configurable via {@code iceberg.partition.column} + * (defaults to {@value #DEFAULT_DATE_PARTITION_COLUMN}). The date value is specified separately via + * {@code iceberg.filter.date} in standard format ({@code yyyy-MM-dd}). + * + * <p><b>Hourly Partition Support:</b> By default, the {@code -00} suffix is appended to all partition values + * for tables using hourly partition format ({@code yyyy-MM-dd-HH}). For tables using standard daily partition + * format ({@code yyyy-MM-dd}), set {@code iceberg.hourly.partition.enabled=false}. Users should always provide + * dates in standard {@code yyyy-MM-dd} format; the hour suffix is automatically managed. + * + * <p><b>Configuration Examples:</b> + * <ul> + * <li>Static: {@code iceberg.partition.column=datepartition, iceberg.filter.date=2025-04-03, iceberg.lookback.days=3} + * discovers: datepartition=2025-04-03, 2025-04-02, 2025-04-01</li> + * <li>Dynamic: {@code iceberg.filter.date=CURRENT_DATE, iceberg.lookback.days=1} + * discovers today's partition only (resolved at runtime)</li> + * </ul> + * + * @param state source state containing filter configuration + * @param icebergTable the Iceberg table to scan + * @return list of file paths with partition metadata matching the filter criteria + * @throws IOException if table scan or file discovery fails + */ + private List<IcebergTable.FilePathWithPartition> discoverPartitionFilePaths(SourceState state, IcebergTable icebergTable) throws IOException { + boolean filterEnabled = state.getPropAsBoolean(ICEBERG_FILTER_ENABLED, true); + + if (!filterEnabled) { + log.info("Partition filter disabled, discovering all data files with partition metadata from current snapshot"); + // Use TableScan without filter to get all files with partition metadata preserved + // This ensures partition structure is maintained even for full table copies + List<IcebergTable.FilePathWithPartition> result = icebergTable.getFilePathsWithPartitionsForFilter(Expressions.alwaysTrue()); + log.info("Discovered {} data files from current snapshot with partition metadata", result.size()); + return result; + } + + String datePartitionColumn = state.getProp(ICEBERG_PARTITION_COLUMN, DEFAULT_DATE_PARTITION_COLUMN); + + String dateValue = state.getProp(ICEBERG_FILTER_DATE); + Preconditions.checkArgument(!StringUtils.isBlank(dateValue), + "iceberg.filter.date is required when iceberg.filter.enabled=true"); + + // Handle CURRENT_DATE placeholder for flows + if (CURRENT_DATE_PLACEHOLDER.equalsIgnoreCase(dateValue)) { + dateValue = LocalDate.now().toString(); + log.info("Resolved {} placeholder to current date: {}", CURRENT_DATE_PLACEHOLDER, dateValue); + } + + // Apply lookback period for date partitions + // lookbackDays=1 (default) means copy only the specified date + // lookbackDays=3 means copy specified date + 2 previous days (total 3 days) + int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS); + List<String> values = Lists.newArrayList(); + + if (lookbackDays >= 1) { + log.info("Applying lookback period of {} days for date partition column '{}': {}", lookbackDays, datePartitionColumn, dateValue); + + // Check if hourly partitioning is enabled + boolean isHourlyPartition = state.getPropAsBoolean(ICEBERG_HOURLY_PARTITION_ENABLED, DEFAULT_HOURLY_PARTITION_ENABLED); + + // Parse the date in standard yyyy-MM-dd format + LocalDate start = LocalDate.parse(dateValue); + Review Comment: thanks.. updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
