Blazer-007 commented on code in PR #4171: URL: https://github.com/apache/gobblin/pull/4171#discussion_r2922629725
########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionFilterGenerator.java: ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + + +/** + * Utility for building Iceberg partition filter {@link Expression}s over date/datetime ranges. + * + * <p>This class is intentionally config-agnostic: callers resolve their own + * {@link DateTimeFormatter}, start datetime, and lookback count, then delegate to one of + * the two factory methods: + * <ul> + * <li>{@link #forDays} — one partition value per calendar day (daily granularity)</li> + * <li>{@link #forHours} — one partition value per clock hour (hourly granularity, naturally + * crosses midnight)</li> + * </ul> + * + * <p>Both methods return a {@link FilterResult} containing the ordered list of partition + * value strings (most-recent first) and the combined Iceberg OR expression ready to be + * passed to a TableScan. + * + * <p>The utility is format-agnostic: any valid {@link DateTimeFormatter} pattern works, + * so tables with partition formats such as {@code yyyy-MM-dd-HH}, {@code dd-MM-yyyy-HH}, + * {@code yyyyMMdd}, or {@code yyyy-MM-dd} are all handled by the same code path. + * + * <p>Example — daily lookback with hourly partition format: + * <pre>{@code + * DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"); + * LocalDateTime start = LocalDateTime.of(2025, 4, 3, 5, 0); // 2025-04-03 at 05:00 + * + * FilterResult result = IcebergPartitionFilterGenerator.forDays("datepartition", start, 3, fmt); + * // partitionValues → ["2025-04-03-05", "2025-04-02-05", "2025-04-01-05"] + * }</pre> + * + * <p>Example — hourly lookback crossing midnight: + * <pre>{@code + * DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH"); + * LocalDateTime start = LocalDateTime.of(2025, 4, 1, 1, 0); // 2025-04-01 at 01:00 + * + * FilterResult result = IcebergPartitionFilterGenerator.forHours("datepartition", start, 3, fmt); + * // partitionValues → ["2025-04-01-01", "2025-04-01-00", "2025-03-31-23"] + * }</pre> + */ +@Slf4j +public class IcebergPartitionFilterGenerator { + + /** + * Immutable result returned by both factory methods. + */ + @Getter + @AllArgsConstructor + public static class FilterResult { + /** Ordered list of partition value strings, most-recent first. */ + private final List<String> partitionValues; + /** Iceberg OR expression that matches any of the partition values. */ + private final Expression filterExpression; + } + + /** + * Generate a partition filter by stepping back {@code lookbackDays} calendar days from + * {@code start}, producing one partition value per day. + * + * <p>The hour/minute of {@code start} is preserved at each step. This means a formatter + * pattern that includes {@code HH} (e.g. {@code yyyy-MM-dd-HH}) will embed the same hour + * in every generated value, giving results like: + * {@code ["2025-04-03-05", "2025-04-02-05", "2025-04-01-05"]}. + * + * @param partitionColumn Iceberg partition column name (e.g. {@code datepartition}) + * @param start reference datetime; the hour component is used when the formatter + * includes an hour field + * @param lookbackDays number of daily partition values to produce; must be ≥ 1 + * @param formatter applied to each stepped datetime to produce the partition value string + * @return {@link FilterResult} with partition values and the combined OR expression + * @throws IllegalArgumentException if {@code lookbackDays} < 1 + */ + public static FilterResult forDays(String partitionColumn, LocalDateTime start, + int lookbackDays, DateTimeFormatter formatter) { + if (lookbackDays < 1) { + throw new IllegalArgumentException( + String.format("lookbackDays must be >= 1, got: %d", lookbackDays)); + } + List<String> values = new ArrayList<>(lookbackDays); + for (int i = 0; i < lookbackDays; i++) { + String val = start.minusDays(i).format(formatter); + values.add(val); + log.info("Including daily partition: {}={}", partitionColumn, val); + } + return new FilterResult(Collections.unmodifiableList(values), + buildStartsWithOrExpression(partitionColumn, values)); + } Review Comment: I don't think we should be adding startsWith as it can copy unwanted data , if user wants to go granular they can use hours lookback example can be user wanted data of yyyy-MM-dd but if they have hourly partition values as well in the column something like yyyy-MM-dd-HH then this will also gets copied ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java: ########## @@ -129,7 +149,25 @@ public class IcebergSource extends FileBasedSource<String, FileAwareInputStream> public static final String ICEBERG_FILE_PARTITION_PATH = "iceberg.file.partition.path"; public static final String ICEBERG_HOURLY_PARTITION_ENABLED = "iceberg.hourly.partition.enabled"; public static final boolean DEFAULT_HOURLY_PARTITION_ENABLED = true; - private static final String HOURLY_PARTITION_SUFFIX = "-00"; + public static final String ICEBERG_PARTITION_HOUR = "iceberg.partition.hour"; // specific hour (0-23) used with hourly/format-based partition, defaults to 0 + /** + * Optional {@link DateTimeFormatter} pattern controlling how the partition value is rendered. + * The input date ({@code iceberg.filter.date}) is always supplied in {@code yyyy-MM-dd} form; + * this pattern governs the <em>output</em> string used in the filter expression. + * + * <p>Examples: + * <ul> + * <li>{@code yyyy-MM-dd} → {@code 2025-04-01} (daily, no hour)</li> + * <li>{@code yyyy-MM-dd-HH} → {@code 2025-04-01-05} (hourly, hour from {@code iceberg.partition.hour})</li> + * <li>{@code dd-MM-yyyy-HH} → {@code 01-04-2025-00} (reversed-date hourly)</li> + * <li>{@code yyyyMMdd} → {@code 20250401} (compact daily)</li> + * </ul> + * + * <p>When this property is set it supersedes {@code iceberg.hourly.partition.enabled}. + * When absent the legacy {@code iceberg.hourly.partition.enabled} / {@code iceberg.partition.hour} + * behaviour is preserved for backward compatibility. + */ + public static final String ICEBERG_PARTITION_VALUE_FORMAT = "iceberg.partition.value.format"; Review Comment: since this is only applied when column type is datepartition lets rename config to `"iceberg.partition.value.datetime.format"` ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java: ########## @@ -80,23 +82,39 @@ * iceberg.table.name=table1 * iceberg.catalog.uri=ICEBERG_CATALOG_URI * - * # Partition filtering with lookback - Static date (hourly partitions by default) + * # Partition filtering with lookback - Static date * iceberg.filter.enabled=true - * iceberg.partition.column=datepartition # Optional, defaults to "datepartition" - * iceberg.filter.date=2025-04-01 # Date value (yyyy-MM-dd format) + * iceberg.partition.column=datepartition # Optional, defaults to "datepartition" + * iceberg.filter.date=2025-04-01 # Input date always in yyyy-MM-dd format * iceberg.lookback.days=3 - * # iceberg.hourly.partition.enabled=true # Optional, defaults to true; generates yyyy-MM-dd-00 format * - * # Partition filtering with lookback - Scheduled flows (dynamic date) + * # Partition filtering - Scheduled flows (dynamic date) * iceberg.filter.enabled=true - * iceberg.filter.date=CURRENT_DATE # Uses current date at runtime + * iceberg.filter.date=CURRENT_DATE # Resolved to current date at runtime * iceberg.lookback.days=3 * - * # Standard daily partitions (disable hourly suffix if table uses yyyy-MM-dd format) - * iceberg.filter.enabled=true - * iceberg.filter.date=2025-04-01 # Date in yyyy-MM-dd format - * iceberg.hourly.partition.enabled=false # Set false for non-hourly partitions - * iceberg.lookback.days=3 + * # --- Recommended: configurable partition value format --- + * # iceberg.partition.value.format is a DateTimeFormatter pattern applied to the output + * # partition value used in the filter expression. The input date is always yyyy-MM-dd. + * # When set, it supersedes iceberg.hourly.partition.enabled. + * + * # Standard hourly partitions (yyyy-MM-dd-HH) + * iceberg.partition.value.format=yyyy-MM-dd-HH # → "2025-04-01-05" + * iceberg.partition.hour=5 # Hour 0-23, default 0 Review Comment: more specifically since users schedules flows they might not know about hour when there data gets generated for one time use cases we can add other way like filter.column and filter.value which applies directly ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java: ########## @@ -80,23 +82,39 @@ * iceberg.table.name=table1 * iceberg.catalog.uri=ICEBERG_CATALOG_URI * - * # Partition filtering with lookback - Static date (hourly partitions by default) + * # Partition filtering with lookback - Static date * iceberg.filter.enabled=true - * iceberg.partition.column=datepartition # Optional, defaults to "datepartition" - * iceberg.filter.date=2025-04-01 # Date value (yyyy-MM-dd format) + * iceberg.partition.column=datepartition # Optional, defaults to "datepartition" + * iceberg.filter.date=2025-04-01 # Input date always in yyyy-MM-dd format * iceberg.lookback.days=3 - * # iceberg.hourly.partition.enabled=true # Optional, defaults to true; generates yyyy-MM-dd-00 format * - * # Partition filtering with lookback - Scheduled flows (dynamic date) + * # Partition filtering - Scheduled flows (dynamic date) * iceberg.filter.enabled=true - * iceberg.filter.date=CURRENT_DATE # Uses current date at runtime + * iceberg.filter.date=CURRENT_DATE # Resolved to current date at runtime * iceberg.lookback.days=3 * - * # Standard daily partitions (disable hourly suffix if table uses yyyy-MM-dd format) - * iceberg.filter.enabled=true - * iceberg.filter.date=2025-04-01 # Date in yyyy-MM-dd format - * iceberg.hourly.partition.enabled=false # Set false for non-hourly partitions - * iceberg.lookback.days=3 + * # --- Recommended: configurable partition value format --- + * # iceberg.partition.value.format is a DateTimeFormatter pattern applied to the output + * # partition value used in the filter expression. The input date is always yyyy-MM-dd. + * # When set, it supersedes iceberg.hourly.partition.enabled. + * + * # Standard hourly partitions (yyyy-MM-dd-HH) + * iceberg.partition.value.format=yyyy-MM-dd-HH # → "2025-04-01-05" + * iceberg.partition.hour=5 # Hour 0-23, default 0 Review Comment: What is the use of this config, in date based filer we have CURRENT_DATE as placeholder, we should have similar CURRENT_DATE + CURRENT_HOUR if pattern is yyyy-MM-dd-HH ? ########## gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java: ########## @@ -286,62 +335,112 @@ private List<IcebergTable.FilePathWithPartition> discoverPartitionFilePaths(Sour log.info("Resolved {} placeholder to current date: {}", CURRENT_DATE_PLACEHOLDER, dateValue); } - // Apply lookback period for date partitions - // lookbackDays=1 (default) means copy only the specified date - // lookbackDays=3 means copy specified date + 2 previous days (total 3 days) - int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS); - List<String> values = Lists.newArrayList(); + // Resolve hour (0-23) — shared by both daily and hourly lookback paths. + int hour = 0; + if (state.contains(ICEBERG_PARTITION_HOUR)) { + hour = state.getPropAsInt(ICEBERG_PARTITION_HOUR, 0); + Preconditions.checkArgument(hour >= 0 && hour <= 23, + String.format("iceberg.partition.hour must be between 0 and 23, got: %d", hour)); + } + + // Resolve the DateTimeFormatter used to render each partition value. + // resolvePartitionFormatter normalises both the new iceberg.partition.value.format path + // and the legacy iceberg.hourly.partition.enabled path into a single formatter. + DateTimeFormatter partitionFormatter = resolvePartitionFormatter(state, hour); - if (lookbackDays >= 1) { - log.info("Applying lookback period of {} days for date partition column '{}': {}", lookbackDays, datePartitionColumn, dateValue); + // Parse the input date — always expected in canonical yyyy-MM-dd form. + LocalDate startDate; + try { + startDate = LocalDate.parse(dateValue); + } catch (java.time.format.DateTimeParseException e) { Review Comment: this should be in format of given format passed through config so that it is consistent, we can remove partition hour config and CURRENT_DATE can resolve using LocalDateTime in the given pattern -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
