Blazer-007 commented on code in PR #4171:
URL: https://github.com/apache/gobblin/pull/4171#discussion_r2922629725


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergPartitionFilterGenerator.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.iceberg;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Utility for building Iceberg partition filter {@link Expression}s over 
date/datetime ranges.
+ *
+ * <p>This class is intentionally config-agnostic: callers resolve their own
+ * {@link DateTimeFormatter}, start datetime, and lookback count, then 
delegate to one of
+ * the two factory methods:
+ * <ul>
+ *   <li>{@link #forDays}  — one partition value per calendar day (daily 
granularity)</li>
+ *   <li>{@link #forHours} — one partition value per clock hour (hourly 
granularity, naturally
+ *       crosses midnight)</li>
+ * </ul>
+ *
+ * <p>Both methods return a {@link FilterResult} containing the ordered list 
of partition
+ * value strings (most-recent first) and the combined Iceberg OR expression 
ready to be
+ * passed to a TableScan.
+ *
+ * <p>The utility is format-agnostic: any valid {@link DateTimeFormatter} 
pattern works,
+ * so tables with partition formats such as {@code yyyy-MM-dd-HH}, {@code 
dd-MM-yyyy-HH},
+ * {@code yyyyMMdd}, or {@code yyyy-MM-dd} are all handled by the same code 
path.
+ *
+ * <p>Example — daily lookback with hourly partition format:
+ * <pre>{@code
+ * DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
+ * LocalDateTime start   = LocalDateTime.of(2025, 4, 3, 5, 0);   // 2025-04-03 
at 05:00
+ *
+ * FilterResult result = 
IcebergPartitionFilterGenerator.forDays("datepartition", start, 3, fmt);
+ * // partitionValues → ["2025-04-03-05", "2025-04-02-05", "2025-04-01-05"]
+ * }</pre>
+ *
+ * <p>Example — hourly lookback crossing midnight:
+ * <pre>{@code
+ * DateTimeFormatter fmt = DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
+ * LocalDateTime start   = LocalDateTime.of(2025, 4, 1, 1, 0);   // 2025-04-01 
at 01:00
+ *
+ * FilterResult result = 
IcebergPartitionFilterGenerator.forHours("datepartition", start, 3, fmt);
+ * // partitionValues → ["2025-04-01-01", "2025-04-01-00", "2025-03-31-23"]
+ * }</pre>
+ */
+@Slf4j
+public class IcebergPartitionFilterGenerator {
+
+  /**
+   * Immutable result returned by both factory methods.
+   */
+  @Getter
+  @AllArgsConstructor
+  public static class FilterResult {
+    /** Ordered list of partition value strings, most-recent first. */
+    private final List<String> partitionValues;
+    /** Iceberg OR expression that matches any of the partition values. */
+    private final Expression filterExpression;
+  }
+
+  /**
+   * Generate a partition filter by stepping back {@code lookbackDays} 
calendar days from
+   * {@code start}, producing one partition value per day.
+   *
+   * <p>The hour/minute of {@code start} is preserved at each step.  This 
means a formatter
+   * pattern that includes {@code HH} (e.g. {@code yyyy-MM-dd-HH}) will embed 
the same hour
+   * in every generated value, giving results like:
+   * {@code ["2025-04-03-05", "2025-04-02-05", "2025-04-01-05"]}.
+   *
+   * @param partitionColumn  Iceberg partition column name (e.g. {@code 
datepartition})
+   * @param start            reference datetime; the hour component is used 
when the formatter
+   *                         includes an hour field
+   * @param lookbackDays     number of daily partition values to produce; must 
be &ge; 1
+   * @param formatter        applied to each stepped datetime to produce the 
partition value string
+   * @return {@link FilterResult} with partition values and the combined OR 
expression
+   * @throws IllegalArgumentException if {@code lookbackDays} &lt; 1
+   */
+  public static FilterResult forDays(String partitionColumn, LocalDateTime 
start,
+      int lookbackDays, DateTimeFormatter formatter) {
+    if (lookbackDays < 1) {
+      throw new IllegalArgumentException(
+          String.format("lookbackDays must be >= 1, got: %d", lookbackDays));
+    }
+    List<String> values = new ArrayList<>(lookbackDays);
+    for (int i = 0; i < lookbackDays; i++) {
+      String val = start.minusDays(i).format(formatter);
+      values.add(val);
+      log.info("Including daily partition: {}={}", partitionColumn, val);
+    }
+    return new FilterResult(Collections.unmodifiableList(values),
+        buildStartsWithOrExpression(partitionColumn, values));
+  }

Review Comment:
   I don't think we should be adding startsWith as it can copy unwanted data , 
if user wants to go granular they can use hours lookback 
   example can be user wanted data of yyyy-MM-dd but if they have hourly 
partition values as well in the column something like yyyy-MM-dd-HH then this 
will also gets copied 



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -129,7 +149,25 @@ public class IcebergSource extends FileBasedSource<String, 
FileAwareInputStream>
   public static final String ICEBERG_FILE_PARTITION_PATH = 
"iceberg.file.partition.path";
   public static final String ICEBERG_HOURLY_PARTITION_ENABLED = 
"iceberg.hourly.partition.enabled";
   public static final boolean DEFAULT_HOURLY_PARTITION_ENABLED = true;
-  private static final String HOURLY_PARTITION_SUFFIX = "-00";
+  public static final String ICEBERG_PARTITION_HOUR = 
"iceberg.partition.hour"; // specific hour (0-23) used with hourly/format-based 
partition, defaults to 0
+  /**
+   * Optional {@link DateTimeFormatter} pattern controlling how the partition 
value is rendered.
+   * The input date ({@code iceberg.filter.date}) is always supplied in {@code 
yyyy-MM-dd} form;
+   * this pattern governs the <em>output</em> string used in the filter 
expression.
+   *
+   * <p>Examples:
+   * <ul>
+   *   <li>{@code yyyy-MM-dd}      → {@code 2025-04-01}      (daily, no 
hour)</li>
+   *   <li>{@code yyyy-MM-dd-HH}   → {@code 2025-04-01-05}   (hourly, hour 
from {@code iceberg.partition.hour})</li>
+   *   <li>{@code dd-MM-yyyy-HH}   → {@code 01-04-2025-00}   (reversed-date 
hourly)</li>
+   *   <li>{@code yyyyMMdd}        → {@code 20250401}         (compact 
daily)</li>
+   * </ul>
+   *
+   * <p>When this property is set it supersedes {@code 
iceberg.hourly.partition.enabled}.
+   * When absent the legacy {@code iceberg.hourly.partition.enabled} / {@code 
iceberg.partition.hour}
+   * behaviour is preserved for backward compatibility.
+   */
+  public static final String ICEBERG_PARTITION_VALUE_FORMAT = 
"iceberg.partition.value.format";

Review Comment:
   since this is only applied when column type is datepartition lets rename 
config to `"iceberg.partition.value.datetime.format"`



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -80,23 +82,39 @@
  * iceberg.table.name=table1
  * iceberg.catalog.uri=ICEBERG_CATALOG_URI
  *
- * # Partition filtering with lookback - Static date (hourly partitions by 
default)
+ * # Partition filtering with lookback - Static date
  * iceberg.filter.enabled=true
- * iceberg.partition.column=datepartition  # Optional, defaults to 
"datepartition"
- * iceberg.filter.date=2025-04-01         # Date value (yyyy-MM-dd format)
+ * iceberg.partition.column=datepartition         # Optional, defaults to 
"datepartition"
+ * iceberg.filter.date=2025-04-01                 # Input date always in 
yyyy-MM-dd format
  * iceberg.lookback.days=3
- * # iceberg.hourly.partition.enabled=true  # Optional, defaults to true; 
generates yyyy-MM-dd-00 format
  *
- * # Partition filtering with lookback - Scheduled flows (dynamic date)
+ * # Partition filtering - Scheduled flows (dynamic date)
  * iceberg.filter.enabled=true
- * iceberg.filter.date=CURRENT_DATE       # Uses current date at runtime
+ * iceberg.filter.date=CURRENT_DATE               # Resolved to current date 
at runtime
  * iceberg.lookback.days=3
  *
- * # Standard daily partitions (disable hourly suffix if table uses yyyy-MM-dd 
format)
- * iceberg.filter.enabled=true
- * iceberg.filter.date=2025-04-01         # Date in yyyy-MM-dd format
- * iceberg.hourly.partition.enabled=false # Set false for non-hourly partitions
- * iceberg.lookback.days=3
+ * # --- Recommended: configurable partition value format ---
+ * # iceberg.partition.value.format is a DateTimeFormatter pattern applied to 
the output
+ * # partition value used in the filter expression. The input date is always 
yyyy-MM-dd.
+ * # When set, it supersedes iceberg.hourly.partition.enabled.
+ *
+ * # Standard hourly partitions (yyyy-MM-dd-HH)
+ * iceberg.partition.value.format=yyyy-MM-dd-HH   # → "2025-04-01-05"
+ * iceberg.partition.hour=5                        # Hour 0-23, default 0

Review Comment:
   more specifically since users schedules flows they might not know about hour 
when there data gets generated for one time use cases we can add other way like 
filter.column and filter.value which applies directly



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -80,23 +82,39 @@
  * iceberg.table.name=table1
  * iceberg.catalog.uri=ICEBERG_CATALOG_URI
  *
- * # Partition filtering with lookback - Static date (hourly partitions by 
default)
+ * # Partition filtering with lookback - Static date
  * iceberg.filter.enabled=true
- * iceberg.partition.column=datepartition  # Optional, defaults to 
"datepartition"
- * iceberg.filter.date=2025-04-01         # Date value (yyyy-MM-dd format)
+ * iceberg.partition.column=datepartition         # Optional, defaults to 
"datepartition"
+ * iceberg.filter.date=2025-04-01                 # Input date always in 
yyyy-MM-dd format
  * iceberg.lookback.days=3
- * # iceberg.hourly.partition.enabled=true  # Optional, defaults to true; 
generates yyyy-MM-dd-00 format
  *
- * # Partition filtering with lookback - Scheduled flows (dynamic date)
+ * # Partition filtering - Scheduled flows (dynamic date)
  * iceberg.filter.enabled=true
- * iceberg.filter.date=CURRENT_DATE       # Uses current date at runtime
+ * iceberg.filter.date=CURRENT_DATE               # Resolved to current date 
at runtime
  * iceberg.lookback.days=3
  *
- * # Standard daily partitions (disable hourly suffix if table uses yyyy-MM-dd 
format)
- * iceberg.filter.enabled=true
- * iceberg.filter.date=2025-04-01         # Date in yyyy-MM-dd format
- * iceberg.hourly.partition.enabled=false # Set false for non-hourly partitions
- * iceberg.lookback.days=3
+ * # --- Recommended: configurable partition value format ---
+ * # iceberg.partition.value.format is a DateTimeFormatter pattern applied to 
the output
+ * # partition value used in the filter expression. The input date is always 
yyyy-MM-dd.
+ * # When set, it supersedes iceberg.hourly.partition.enabled.
+ *
+ * # Standard hourly partitions (yyyy-MM-dd-HH)
+ * iceberg.partition.value.format=yyyy-MM-dd-HH   # → "2025-04-01-05"
+ * iceberg.partition.hour=5                        # Hour 0-23, default 0

Review Comment:
   What is the use of this config, in date based filer we have CURRENT_DATE as 
placeholder, we should have similar CURRENT_DATE + CURRENT_HOUR if pattern is 
yyyy-MM-dd-HH ?



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergSource.java:
##########
@@ -286,62 +335,112 @@ private List<IcebergTable.FilePathWithPartition> 
discoverPartitionFilePaths(Sour
       log.info("Resolved {} placeholder to current date: {}", 
CURRENT_DATE_PLACEHOLDER, dateValue);
     }
 
-    // Apply lookback period for date partitions
-    // lookbackDays=1 (default) means copy only the specified date
-    // lookbackDays=3 means copy specified date + 2 previous days (total 3 
days)
-    int lookbackDays = state.getPropAsInt(ICEBERG_LOOKBACK_DAYS, 
DEFAULT_LOOKBACK_DAYS);
-    List<String> values = Lists.newArrayList();
+    // Resolve hour (0-23) — shared by both daily and hourly lookback paths.
+    int hour = 0;
+    if (state.contains(ICEBERG_PARTITION_HOUR)) {
+      hour = state.getPropAsInt(ICEBERG_PARTITION_HOUR, 0);
+      Preconditions.checkArgument(hour >= 0 && hour <= 23,
+        String.format("iceberg.partition.hour must be between 0 and 23, got: 
%d", hour));
+    }
+
+    // Resolve the DateTimeFormatter used to render each partition value.
+    // resolvePartitionFormatter normalises both the new 
iceberg.partition.value.format path
+    // and the legacy iceberg.hourly.partition.enabled path into a single 
formatter.
+    DateTimeFormatter partitionFormatter = resolvePartitionFormatter(state, 
hour);
 
-    if (lookbackDays >= 1) {
-      log.info("Applying lookback period of {} days for date partition column 
'{}': {}", lookbackDays, datePartitionColumn, dateValue);
+    // Parse the input date — always expected in canonical yyyy-MM-dd form.
+    LocalDate startDate;
+    try {
+      startDate = LocalDate.parse(dateValue);
+    } catch (java.time.format.DateTimeParseException e) {

Review Comment:
   this should be in format of given format passed through config so that it is 
consistent, we can remove partition hour config and CURRENT_DATE can resolve 
using LocalDateTime in the given pattern



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to