This is an automated email from the ASF dual-hosted git repository.

atul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b1e398bcc6c Add time window filter to Iceberg reader (#18531)
b1e398bcc6c is described below

commit b1e398bcc6ccffcf7373f4644faaf858f9f51b0f
Author: Atul Mohan <[email protected]>
AuthorDate: Wed Sep 17 15:37:03 2025 -0700

    Add time window filter to Iceberg reader (#18531)
    
    * Add time window filter to Iceberg reader
    
    * Fix checkstyle
    
    * Code scan fix
    
    * Spelling
    
    * Add type info in docs
---
 docs/ingestion/input-sources.md                    | 12 ++-
 .../apache/druid/iceberg/filter/IcebergFilter.java |  3 +-
 .../iceberg/filter/IcebergTimeWindowFilter.java    | 92 ++++++++++++++++++++++
 .../filter/IcebergTimeWindowFilterTest.java        | 62 +++++++++++++++
 website/.spelling                                  |  5 ++
 5 files changed, 172 insertions(+), 2 deletions(-)

diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md
index d4698b20f03..be9a7456052 100644
--- a/docs/ingestion/input-sources.md
+++ b/docs/ingestion/input-sources.md
@@ -1119,7 +1119,7 @@ Sample:
 
 ### Iceberg filter object
 
-This input source provides the following filters: `and`, `equals`, `interval`, 
and `or`. You can use these filters to filter out data files from a snapshot, 
reducing the number of files Druid has to ingest.
+This input source provides the following filters: `and`, `equals`, `interval`, 
`timeWindow`, `range` and `or`. You can use these filters to filter out data 
files from a snapshot, reducing the number of files Druid has to ingest.
 It is strongly recommended to apply filtering only on Iceberg partition 
columns. When filtering on non-partition columns, Iceberg filters may return 
rows that do not fully match the expression. To address this, it may help to 
define an additional filter in the 
[`transformSpec`](./ingestion-spec.md#transformspec) to remove residual rows.
 
 `equals` Filter:
@@ -1170,6 +1170,16 @@ It is strongly recommended to apply filtering only on 
Iceberg partition columns.
 |lowerOpen|Boolean indicating if lower bound is open in the interval of values 
defined by the range (`>` instead of `>=`). |false|no|
 |upperOpen|Boolean indicating if upper bound is open on the interval of values 
defined by range (`<` instead of `<=`). |false|no|
 
+`timeWindow` Filter:
+
+| Property|Description|Default| Required |
+|---------|-----------|-------|----------|
+|type| Set this value to `timeWindow`.|None|yes|
+|filterColumn|The column name from the iceberg table schema based on which 
filtering needs to happen. The filter column must be defined as TimestampType 
in Iceberg.|None|yes|
+|baseTime|Determines the reference timestamp from which the lookback and 
lookahead durations are applied to define the time window.|Current UTC 
timestamp|no|
+|lookbackDuration|Defines the duration that determines how far backward should 
the filter include data relative to `baseTime`.|P1D|no|
+|lookaheadDuration|Defines the duration that determines how far ahead should 
the filter include data relative to `baseTime`.|Zero|no|
+
 ## Delta Lake input source
 
 :::info[Required extension]
diff --git 
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
 
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
index cff8797d60b..b47f0fc92a0 100644
--- 
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
+++ 
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
@@ -34,7 +34,8 @@ import org.apache.iceberg.expressions.Expression;
     @JsonSubTypes.Type(name = "and", value = IcebergAndFilter.class),
     @JsonSubTypes.Type(name = "not", value = IcebergNotFilter.class),
     @JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class),
-    @JsonSubTypes.Type(name = "range", value = IcebergRangeFilter.class)
+    @JsonSubTypes.Type(name = "range", value = IcebergRangeFilter.class),
+    @JsonSubTypes.Type(name = "timeWindow", value = 
IcebergTimeWindowFilter.class)
 })
 public interface IcebergFilter
 {
diff --git 
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilter.java
 
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilter.java
new file mode 100644
index 00000000000..29ca18752f3
--- /dev/null
+++ 
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.types.Types;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+public class IcebergTimeWindowFilter implements IcebergFilter
+{
+  @JsonProperty
+  private final String filterColumn;
+
+  @JsonProperty
+  private final Duration lookbackDuration;
+
+  @JsonProperty
+  private final Duration lookaheadDuration;
+
+  @JsonProperty
+  private final DateTime baseTime;
+
+  @JsonCreator
+  public IcebergTimeWindowFilter(
+      @JsonProperty("filterColumn") String filterColumn,
+      @JsonProperty("lookbackDuration") Duration lookbackDuration,
+      @JsonProperty("lookaheadDuration") Duration lookaheadDuration,
+      @JsonProperty("baseTime") DateTime baseTime
+  )
+  {
+    Preconditions.checkNotNull(filterColumn, "You must specify a filter column 
on the timeWindow filter");
+    this.filterColumn = filterColumn;
+    this.lookbackDuration = Configs.valueOrDefault(lookbackDuration, new 
Period("P1D").toStandardDuration());
+    this.lookaheadDuration = Configs.valueOrDefault(lookaheadDuration, 
Duration.ZERO);
+    this.baseTime = Configs.valueOrDefault(baseTime, DateTimes.nowUtc());
+  }
+
+  @Override
+  public TableScan filter(TableScan tableScan)
+  {
+    return tableScan.filter(getFilterExpression());
+  }
+
+  @Override
+  public Expression getFilterExpression()
+  {
+    // Convert milliseconds to microseconds because Iceberg TimestampType uses 
microsecond precision
+    long lookbackDurationinMicros = (baseTime.getMillis() - 
lookbackDuration.getMillis()) * 1000L;
+    long lookforwardDurationinMicros = (baseTime.getMillis() + 
lookaheadDuration.getMillis()) * 1000L;
+    return Expressions.and(
+        Expressions.greaterThanOrEqual(
+            filterColumn,
+            Literal.of(lookbackDurationinMicros)
+                                 .to(Types.TimestampType.withZone())
+                                 .value()
+        ),
+        Expressions.lessThanOrEqual(
+            filterColumn,
+            Literal.of(lookforwardDurationinMicros)
+                                 .to(Types.TimestampType.withZone())
+                                 .value()
+        )
+    );
+  }
+}
diff --git 
a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilterTest.java
 
b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilterTest.java
new file mode 100644
index 00000000000..51df6c95ac9
--- /dev/null
+++ 
b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilterTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.filter;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.types.Types;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IcebergTimeWindowFilterTest
+{
+  @Test
+  public void testFilter()
+  {
+    String intervalColumn = "eventTime";
+    DateTime currentTimestamp = DateTimes.nowUtc();
+    IcebergTimeWindowFilter intervalFilter = new IcebergTimeWindowFilter(
+        intervalColumn,
+        new Period("P2D").toStandardDuration(),
+        new Period("P1D").toStandardDuration(),
+        currentTimestamp
+    );
+    Expression expectedExpression = Expressions.and(
+        Expressions.greaterThanOrEqual(
+            intervalColumn,
+            Literal.of((currentTimestamp.getMillis() - 
Duration.standardDays(2L).getMillis()) * 1000)
+                   .to(Types.TimestampType.withZone())
+                   .value()
+        ),
+        Expressions.lessThanOrEqual(
+            intervalColumn,
+            Literal.of((currentTimestamp.getMillis() + 
Duration.standardDays(1L).getMillis()) * 1000)
+                   .to(Types.TimestampType.withZone())
+                   .value()
+        )
+    );
+    Assert.assertEquals(expectedExpression.toString(), 
intervalFilter.getFilterExpression().toString());
+  }
+}
diff --git a/website/.spelling b/website/.spelling
index 610d82db1fb..767ef4249ab 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -286,6 +286,7 @@ backfills
 backfilled
 backpressure
 base64
+baseTime
 big-endian
 bigint
 blkio
@@ -409,7 +410,10 @@ localhost
 log4j
 log4j2
 log4j2.xml
+lookahead
+lookaheadDuration
 lookback
+lookbackDuration
 lookups
 mapreduce
 masse
@@ -1348,6 +1352,7 @@ listDelimiter
 lowerOpen
 timestamp
 timestampColumnName
+TimestampType
 timestampSpec
 upperOpen
 urls


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to