This is an automated email from the ASF dual-hosted git repository.

abhishekrb pushed a commit to branch 29.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git

commit b44bd34762a380fb04235e48600e8c265ebc5e65
Author: Atul Mohan <[email protected]>
AuthorDate: Thu Feb 1 23:32:30 2024 -0800

    Add range filtering support for iceberg ingestion (#15782)
    
    * Add range filtering support for iceberg ingestion
    
    * Docs formatting
    
    * Spelling
---
 docs/ingestion/input-sources.md                    | 11 +++
 .../druid-iceberg-extensions/pom.xml               |  4 -
 .../apache/druid/iceberg/filter/IcebergFilter.java |  3 +-
 .../druid/iceberg/filter/IcebergRangeFilter.java   | 93 ++++++++++++++++++++++
 .../iceberg/filter/IcebergRangeFilterTest.java     | 68 ++++++++++++++++
 website/.spelling                                  |  2 +
 6 files changed, 176 insertions(+), 5 deletions(-)

diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md
index d4693e7925a..0adc001cd23 100644
--- a/docs/ingestion/input-sources.md
+++ b/docs/ingestion/input-sources.md
@@ -1013,6 +1013,17 @@ This input source provides the following filters: `and`, 
`equals`, `interval`, a
 |type|Set this value to `not`.|yes|
 |filter|The iceberg filter on which logical NOT is applied|yes|
 
+`range` Filter:
+
+|Property|Description|Default|Required|
+|--------|-----------|-------|--------|
+|type|Set this value to `range`.|None|yes|
+|filterColumn|The column name from the iceberg table schema based on which 
range filtering needs to happen.|None|yes|
+|lower|Lower bound value to match.|None|no. At least one of `lower` or `upper` 
must not be null.|
+|upper|Upper bound value to match. |None|no. At least one of `lower` or 
`upper` must not be null.|
+|lowerOpen|Boolean indicating if lower bound is open in the interval of values 
defined by the range (">" instead of ">="). |false|no|
+|upperOpen|Boolean indicating if upper bound is open on the interval of values 
defined by range ("<" instead of "<="). |false|no|
+
 ## Delta Lake input source
 
 :::info
diff --git a/extensions-contrib/druid-iceberg-extensions/pom.xml 
b/extensions-contrib/druid-iceberg-extensions/pom.xml
index 577d56978bc..d5be8235d4a 100644
--- a/extensions-contrib/druid-iceberg-extensions/pom.xml
+++ b/extensions-contrib/druid-iceberg-extensions/pom.xml
@@ -206,10 +206,6 @@
           <groupId>org.slf4j</groupId>
           <artifactId>slf4j-reload4j</artifactId>
         </exclusion>
-        <exclusion>
-          <groupId>com.google.re2j</groupId>
-          <artifactId>re2j</artifactId>
-        </exclusion>
         <exclusion>
           <groupId>com.google.code.gson</groupId>
           <artifactId>gson</artifactId>
diff --git 
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
 
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
index 10c07cdbe24..cff8797d60b 100644
--- 
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
+++ 
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
@@ -33,7 +33,8 @@ import org.apache.iceberg.expressions.Expression;
     @JsonSubTypes.Type(name = "equals", value = IcebergEqualsFilter.class),
     @JsonSubTypes.Type(name = "and", value = IcebergAndFilter.class),
     @JsonSubTypes.Type(name = "not", value = IcebergNotFilter.class),
-    @JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class)
+    @JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class),
+    @JsonSubTypes.Type(name = "range", value = IcebergRangeFilter.class)
 })
 public interface IcebergFilter
 {
diff --git 
a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergRangeFilter.java
 
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergRangeFilter.java
new file mode 100644
index 00000000000..2e3f42dbba9
--- /dev/null
+++ 
b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergRangeFilter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class IcebergRangeFilter implements IcebergFilter
+{
+  @JsonProperty
+  private final String filterColumn;
+  @JsonProperty
+  private final Boolean lowerOpen;
+  @JsonProperty
+  private final Boolean upperOpen;
+  @JsonProperty
+  private final Object lower;
+  @JsonProperty
+  private final Object upper;
+
+  @JsonCreator
+  public IcebergRangeFilter(
+      @JsonProperty("filterColumn") String filterColumn,
+      @JsonProperty("lower") @Nullable Object lower,
+      @JsonProperty("upper") @Nullable Object upper,
+      @JsonProperty("lowerOpen") @Nullable Boolean lowerOpen,
+      @JsonProperty("upperOpen") @Nullable Boolean upperOpen
+  )
+  {
+    Preconditions.checkNotNull(filterColumn, "You must specify a filter column 
on the range filter");
+    Preconditions.checkArgument(lower != null || upper != null, "Both lower 
and upper bounds cannot be empty");
+    this.filterColumn = filterColumn;
+    this.lowerOpen = lowerOpen != null ? lowerOpen : false;
+    this.upperOpen = upperOpen != null ? upperOpen : false;
+    this.lower = lower;
+    this.upper = upper;
+  }
+
+
+  @Override
+  public TableScan filter(TableScan tableScan)
+  {
+    return tableScan.filter(getFilterExpression());
+  }
+
+  @Override
+  public Expression getFilterExpression()
+  {
+    List<Expression> expressions = new ArrayList<>();
+
+    if (lower != null) {
+      Expression lowerExp = lowerOpen
+                            ? Expressions.greaterThan(filterColumn, lower)
+                            : Expressions.greaterThanOrEqual(filterColumn, 
lower);
+      expressions.add(lowerExp);
+    }
+    if (upper != null) {
+      Expression upperExp = upperOpen
+                            ? Expressions.lessThan(filterColumn, upper)
+                            : Expressions.lessThanOrEqual(filterColumn, upper);
+      expressions.add(upperExp);
+    }
+    if (expressions.size() == 2) {
+      return Expressions.and(expressions.get(0), expressions.get(1));
+    }
+    return expressions.get(0);
+  }
+}
diff --git 
a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergRangeFilterTest.java
 
b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergRangeFilterTest.java
new file mode 100644
index 00000000000..c85ef7c8feb
--- /dev/null
+++ 
b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergRangeFilterTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.filter;
+
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IcebergRangeFilterTest
+{
+  private final String TEST_COLUMN = "column1";
+
+  @Test
+  public void testUpperOpenFilter()
+  {
+    Expression expectedExpression = Expressions.and(
+        Expressions.greaterThanOrEqual(TEST_COLUMN, 45),
+        Expressions.lessThan(TEST_COLUMN, 50)
+    );
+    IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 45, 
50, false, true);
+    Assert.assertEquals(expectedExpression.toString(), 
rangeFilter.getFilterExpression().toString());
+  }
+
+  @Test
+  public void testLowerOpenFilter()
+  {
+    Expression expectedExpression = Expressions.and(
+        Expressions.greaterThan(TEST_COLUMN, 45),
+        Expressions.lessThanOrEqual(TEST_COLUMN, 50)
+    );
+    IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 45, 
50, true, false);
+    Assert.assertEquals(expectedExpression.toString(), 
rangeFilter.getFilterExpression().toString());
+  }
+
+  @Test
+  public void testNoLowerFilter()
+  {
+    Expression expectedExpression = Expressions.lessThanOrEqual(TEST_COLUMN, 
50);
+    IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, null, 
50, null, false);
+    Assert.assertEquals(expectedExpression.toString(), 
rangeFilter.getFilterExpression().toString());
+  }
+
+  @Test
+  public void testNoUpperFilter()
+  {
+    Expression expectedExpression = 
Expressions.greaterThanOrEqual(TEST_COLUMN, 100);
+    IcebergRangeFilter rangeFilter = new IcebergRangeFilter(TEST_COLUMN, 100, 
null, null, null);
+    Assert.assertEquals(expectedExpression.toString(), 
rangeFilter.getFilterExpression().toString());
+  }
+}
diff --git a/website/.spelling b/website/.spelling
index 227e97160fb..422d8a69a9b 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1306,9 +1306,11 @@ kafka.topic
 keyColumnName
 keyFormat
 listDelimiter
+lowerOpen
 timestamp
 timestampColumnName
 timestampSpec
+upperOpen
 urls
 valueFormat
 1GB


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to