This is an automated email from the ASF dual-hosted git repository.
atul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b1e398bcc6c Add time window filter to Iceberg reader (#18531)
b1e398bcc6c is described below
commit b1e398bcc6ccffcf7373f4644faaf858f9f51b0f
Author: Atul Mohan <[email protected]>
AuthorDate: Wed Sep 17 15:37:03 2025 -0700
Add time window filter to Iceberg reader (#18531)
* Add time window filter to Iceberg reader
* Fix checkstyle
* Code scan fix
* Spelling
* Add type info in docs
---
docs/ingestion/input-sources.md | 12 ++-
.../apache/druid/iceberg/filter/IcebergFilter.java | 3 +-
.../iceberg/filter/IcebergTimeWindowFilter.java | 92 ++++++++++++++++++++++
.../filter/IcebergTimeWindowFilterTest.java | 62 +++++++++++++++
website/.spelling | 5 ++
5 files changed, 172 insertions(+), 2 deletions(-)
diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md
index d4698b20f03..be9a7456052 100644
--- a/docs/ingestion/input-sources.md
+++ b/docs/ingestion/input-sources.md
@@ -1119,7 +1119,7 @@ Sample:
### Iceberg filter object
-This input source provides the following filters: `and`, `equals`, `interval`,
and `or`. You can use these filters to filter out data files from a snapshot,
reducing the number of files Druid has to ingest.
+This input source provides the following filters: `and`, `equals`, `interval`,
`timeWindow`, `range` and `or`. You can use these filters to filter out data
files from a snapshot, reducing the number of files Druid has to ingest.
It is strongly recommended to apply filtering only on Iceberg partition
columns. When filtering on non-partition columns, Iceberg filters may return
rows that do not fully match the expression. To address this, it may help to
define an additional filter in the
[`transformSpec`](./ingestion-spec.md#transformspec) to remove residual rows.
`equals` Filter:
@@ -1170,6 +1170,16 @@ It is strongly recommended to apply filtering only on
Iceberg partition columns.
|lowerOpen|Boolean indicating if lower bound is open in the interval of values
defined by the range (`>` instead of `>=`). |false|no|
|upperOpen|Boolean indicating if upper bound is open on the interval of values
defined by range (`<` instead of `<=`). |false|no|
+`timeWindow` Filter:
+
+| Property|Description|Default| Required |
+|---------|-----------|-------|----------|
+|type| Set this value to `timeWindow`.|None|yes|
+|filterColumn|The column name from the iceberg table schema based on which
filtering needs to happen. The filter column must be defined as TimestampType
in Iceberg.|None|yes|
+|baseTime|Determines the reference timestamp from which the lookback and
lookahead durations are applied to define the time window.|Current UTC
timestamp|no|
+|lookbackDuration|Defines the duration that determines how far backward should
the filter include data relative to `baseTime`.|P1D|no|
+|lookaheadDuration|Defines the duration that determines how far ahead should
the filter include data relative to `baseTime`.|Zero|no|
+
## Delta Lake input source
:::info[Required extension]
diff --git
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
index cff8797d60b..b47f0fc92a0 100644
---
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
+++
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
@@ -34,7 +34,8 @@ import org.apache.iceberg.expressions.Expression;
@JsonSubTypes.Type(name = "and", value = IcebergAndFilter.class),
@JsonSubTypes.Type(name = "not", value = IcebergNotFilter.class),
@JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class),
- @JsonSubTypes.Type(name = "range", value = IcebergRangeFilter.class)
+ @JsonSubTypes.Type(name = "range", value = IcebergRangeFilter.class),
+ @JsonSubTypes.Type(name = "timeWindow", value =
IcebergTimeWindowFilter.class)
})
public interface IcebergFilter
{
diff --git
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilter.java
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilter.java
new file mode 100644
index 00000000000..29ca18752f3
--- /dev/null
+++
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.types.Types;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+public class IcebergTimeWindowFilter implements IcebergFilter
+{
+ @JsonProperty
+ private final String filterColumn;
+
+ @JsonProperty
+ private final Duration lookbackDuration;
+
+ @JsonProperty
+ private final Duration lookaheadDuration;
+
+ @JsonProperty
+ private final DateTime baseTime;
+
+ @JsonCreator
+ public IcebergTimeWindowFilter(
+ @JsonProperty("filterColumn") String filterColumn,
+ @JsonProperty("lookbackDuration") Duration lookbackDuration,
+ @JsonProperty("lookaheadDuration") Duration lookaheadDuration,
+ @JsonProperty("baseTime") DateTime baseTime
+ )
+ {
+ Preconditions.checkNotNull(filterColumn, "You must specify a filter column
on the timeWindow filter");
+ this.filterColumn = filterColumn;
+ this.lookbackDuration = Configs.valueOrDefault(lookbackDuration, new
Period("P1D").toStandardDuration());
+ this.lookaheadDuration = Configs.valueOrDefault(lookaheadDuration,
Duration.ZERO);
+ this.baseTime = Configs.valueOrDefault(baseTime, DateTimes.nowUtc());
+ }
+
+ @Override
+ public TableScan filter(TableScan tableScan)
+ {
+ return tableScan.filter(getFilterExpression());
+ }
+
+ @Override
+ public Expression getFilterExpression()
+ {
+ // Convert milliseconds to microseconds because Iceberg TimestampType uses
microsecond precision
+ long lookbackDurationinMicros = (baseTime.getMillis() -
lookbackDuration.getMillis()) * 1000L;
+ long lookforwardDurationinMicros = (baseTime.getMillis() +
lookaheadDuration.getMillis()) * 1000L;
+ return Expressions.and(
+ Expressions.greaterThanOrEqual(
+ filterColumn,
+ Literal.of(lookbackDurationinMicros)
+ .to(Types.TimestampType.withZone())
+ .value()
+ ),
+ Expressions.lessThanOrEqual(
+ filterColumn,
+ Literal.of(lookforwardDurationinMicros)
+ .to(Types.TimestampType.withZone())
+ .value()
+ )
+ );
+ }
+}
diff --git
a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilterTest.java
b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilterTest.java
new file mode 100644
index 00000000000..51df6c95ac9
--- /dev/null
+++
b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilterTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.filter;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.types.Types;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IcebergTimeWindowFilterTest
+{
+ @Test
+ public void testFilter()
+ {
+ String intervalColumn = "eventTime";
+ DateTime currentTimestamp = DateTimes.nowUtc();
+ IcebergTimeWindowFilter intervalFilter = new IcebergTimeWindowFilter(
+ intervalColumn,
+ new Period("P2D").toStandardDuration(),
+ new Period("P1D").toStandardDuration(),
+ currentTimestamp
+ );
+ Expression expectedExpression = Expressions.and(
+ Expressions.greaterThanOrEqual(
+ intervalColumn,
+ Literal.of((currentTimestamp.getMillis() -
Duration.standardDays(2L).getMillis()) * 1000)
+ .to(Types.TimestampType.withZone())
+ .value()
+ ),
+ Expressions.lessThanOrEqual(
+ intervalColumn,
+ Literal.of((currentTimestamp.getMillis() +
Duration.standardDays(1L).getMillis()) * 1000)
+ .to(Types.TimestampType.withZone())
+ .value()
+ )
+ );
+ Assert.assertEquals(expectedExpression.toString(),
intervalFilter.getFilterExpression().toString());
+ }
+}
diff --git a/website/.spelling b/website/.spelling
index 610d82db1fb..767ef4249ab 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -286,6 +286,7 @@ backfills
backfilled
backpressure
base64
+baseTime
big-endian
bigint
blkio
@@ -409,7 +410,10 @@ localhost
log4j
log4j2
log4j2.xml
+lookahead
+lookaheadDuration
lookback
+lookbackDuration
lookups
mapreduce
masse
@@ -1348,6 +1352,7 @@ listDelimiter
lowerOpen
timestamp
timestampColumnName
+TimestampType
timestampSpec
upperOpen
urls
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]