This is an automated email from the ASF dual-hosted git repository. abhishekrb pushed a commit to branch 29.0.0 in repository https://gitbox.apache.org/repos/asf/druid.git
commit b44bd34762a380fb04235e48600e8c265ebc5e65 Author: Atul Mohan <[email protected]> AuthorDate: Thu Feb 1 23:32:30 2024 -0800 Add range filtering support for iceberg ingestion (#15782) * Add range filtering support for iceberg ingestion * Docs formatting * Spelling --- docs/ingestion/input-sources.md | 11 +++ .../druid-iceberg-extensions/pom.xml | 4 - .../apache/druid/iceberg/filter/IcebergFilter.java | 3 +- .../druid/iceberg/filter/IcebergRangeFilter.java | 93 ++++++++++++++++++++++ .../iceberg/filter/IcebergRangeFilterTest.java | 68 ++++++++++++++++ website/.spelling | 2 + 6 files changed, 176 insertions(+), 5 deletions(-) diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md index d4693e7925a..0adc001cd23 100644 --- a/docs/ingestion/input-sources.md +++ b/docs/ingestion/input-sources.md @@ -1013,6 +1013,17 @@ This input source provides the following filters: `and`, `equals`, `interval`, a |type|Set this value to `not`.|yes| |filter|The iceberg filter on which logical NOT is applied|yes| +`range` Filter: + +|Property|Description|Default|Required| +|--------|-----------|-------|--------| +|type|Set this value to `range`.|None|yes| +|filterColumn|The column name from the iceberg table schema based on which range filtering needs to happen.|None|yes| +|lower|Lower bound value to match.|None|no. At least one of `lower` or `upper` must not be null.| +|upper|Upper bound value to match. |None|no. At least one of `lower` or `upper` must not be null.| +|lowerOpen|Boolean indicating if lower bound is open in the interval of values defined by the range (">" instead of ">="). |false|no| +|upperOpen|Boolean indicating if upper bound is open on the interval of values defined by range ("<" instead of "<="). |false|no| + ## Delta Lake input source :::info diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml b/extensions-contrib/druid-iceberg-extensions/pom.xml index 577d56978bc..d5be8235d4a 100644 --- a/extensions-contrib/druid-iceberg-extensions/pom.xml +++ b/extensions-contrib/druid-iceberg-extensions/pom.xml @@ -206,10 +206,6 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-reload4j</artifactId> </exclusion> - <exclusion> - <groupId>com.google.re2j</groupId> - <artifactId>re2j</artifactId> - </exclusion> <exclusion> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java index 10c07cdbe24..cff8797d60b 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java @@ -33,7 +33,8 @@ import org.apache.iceberg.expressions.Expression; @JsonSubTypes.Type(name = "equals", value = IcebergEqualsFilter.class), @JsonSubTypes.Type(name = "and", value = IcebergAndFilter.class), @JsonSubTypes.Type(name = "not", value = IcebergNotFilter.class), - @JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class) + @JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class), + @JsonSubTypes.Type(name = "range", value = IcebergRangeFilter.class) }) public interface IcebergFilter { diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergRangeFilter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergRangeFilter.java new file mode 100644 index 00000000000..2e3f42dbba9 --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergRangeFilter.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; + +public class IcebergRangeFilter implements IcebergFilter +{ + @JsonProperty + private final String filterColumn; + @JsonProperty + private final Boolean lowerOpen; + @JsonProperty + private final Boolean upperOpen; + @JsonProperty + private final Object lower; + @JsonProperty + private final Object upper; + + @JsonCreator + public IcebergRangeFilter( + @JsonProperty("filterColumn") String filterColumn, + @JsonProperty("lower") @Nullable Object lower, + @JsonProperty("upper") @Nullable Object upper, + @JsonProperty("lowerOpen") @Nullable Boolean lowerOpen, + @JsonProperty("upperOpen") @Nullable Boolean upperOpen + ) + { + Preconditions.checkNotNull(filterColumn, "You must specify a filter column on the range filter"); + Preconditions.checkArgument(lower != null || upper != null, "Both lower and upper bounds cannot be empty"); + this.filterColumn = filterColumn; + this.lowerOpen = lowerOpen != null ? lowerOpen : false; + this.upperOpen = upperOpen != null ? upperOpen : false; + this.lower = lower; + this.upper = upper; + } + + + @Override + public TableScan filter(TableScan tableScan) + { + return tableScan.filter(getFilterExpression()); + } + + @Override + public Expression getFilterExpression() + { + List<Expression> expressions = new ArrayList<>(); + + if (lower != null) { + Expression lowerExp = lowerOpen + ? Expressions.greaterThan(filterColumn, lower) + : Expressions.greaterThanOrEqual(filterColumn, lower); + expressions.add(lowerExp); + } + if (upper != null) { + Expression upperExp = upperOpen + ? Expressions.lessThan(filterColumn, upper) + : Expressions.lessThanOrEqual(filterColumn, upper); + expressions.add(upperExp); + } + if (expressions.size() == 2) { + return Expressions.and(expressions.get(0), expressions.get(1)); + } + return expressions.get(0); + } +} diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergRangeFilterTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergRangeFilterTest.java new file mode 100644 index 00000000000..c85ef7c8feb --- /dev/null +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergRangeFilterTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.filter; + +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.junit.Assert; +import org.junit.Test; + +public class IcebergRangeFilterTest +{ + private final String TEST_COLUMN = "column1"; + + @Test + public void testUpperOpenFilter() + { + Expression expectedExpression = Expressions.and( + Expressions.greaterThanOrEqual(TEST_COLUMN, 45), + Expressions.lessThan(TEST_COLUMN, 50) + ); + IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 45, 50, false, true); + Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString()); + } + + @Test + public void testLowerOpenFilter() + { + Expression expectedExpression = Expressions.and( + Expressions.greaterThan(TEST_COLUMN, 45), + Expressions.lessThanOrEqual(TEST_COLUMN, 50) + ); + IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 45, 50, true, false); + Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString()); + } + + @Test + public void testNoLowerFilter() + { + Expression expectedExpression = Expressions.lessThanOrEqual(TEST_COLUMN, 50); + IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, null, 50, null, false); + Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString()); + } + + @Test + public void testNoUpperFilter() + { + Expression expectedExpression = Expressions.greaterThanOrEqual(TEST_COLUMN, 100); + IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 100, null, null, null); + Assert.assertEquals(expectedExpression.toString(), rangeFilter.getFilterExpression().toString()); + } +} diff --git a/website/.spelling b/website/.spelling index 227e97160fb..422d8a69a9b 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1306,9 +1306,11 @@ kafka.topic keyColumnName keyFormat listDelimiter +lowerOpen timestamp timestampColumnName timestampSpec +upperOpen urls valueFormat 1GB --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
