This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 779320b  [CARBONDATA-3832]Added block and blocket pruning for the 
polygon expression processing
779320b is described below

commit 779320b7b70a9af7ee74ff7e25adfc1cc5ebce43
Author: Venu Reddy <[email protected]>
AuthorDate: Sun Mar 22 21:56:45 2020 +0530

    [CARBONDATA-3832]Added block and blocket pruning for the polygon expression 
processing
    
    Why is this PR needed?
    At present, carbon doesn't do block/blocklet pruning for polygon fileter 
queries.
    It does rowlevel filtering at carbon layer and returns result. With this 
approach,
    all the carbon files are scanned irrespective of the where there are any 
matching
    rows in the block. It also has spark overhead to launch many jobs and tasks 
to process them.
    Thus affects the overall performance of polygon query.
    
    What changes were proposed in this PR?
    Leverage the existing block pruning mechanism in the carbon and avoided the 
unwanted
    blocks with block pruning. Thus reduce the number of splits. And at the 
executor side,
    used blocklet pruning and reduced the number of blocklets to be read and 
scanned.
    
    This closes #3772
---
 .../core/scan/expression/UnknownExpression.java    |   8 ++
 .../carbondata/core/scan/filter/FilterUtil.java    |   9 ++
 .../executer/RowLevelFilterExecuterImpl.java       |   2 +-
 .../geo/scan/expression/PolygonExpression.java     |  30 ++++--
 .../filter/executor/PolygonFilterExecutorImpl.java | 112 +++++++++++++++++++++
 .../scala/org/apache/carbondata/geo/GeoTest.scala  |  59 +++++------
 6 files changed, 185 insertions(+), 35 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/UnknownExpression.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/UnknownExpression.java
index 2cb26a6..cbea664 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/expression/UnknownExpression.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/expression/UnknownExpression.java
@@ -19,8 +19,16 @@ package org.apache.carbondata.core.scan.expression;
 
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
 public abstract class UnknownExpression extends Expression {
 
   public abstract List<ColumnExpression> getAllColumnList();
 
+  public FilterExecuter getFilterExecuter(FilterResolverIntf 
filterResolverIntf,
+      SegmentProperties segmentProperties) {
+    return null;
+  }
 }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 748bafd..6f121ca 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -49,6 +49,7 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.UnknownExpression;
 import org.apache.carbondata.core.scan.expression.conditional.*;
 import 
org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
 import 
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
@@ -188,6 +189,14 @@ public final class FilterUtil {
           return new FalseFilterExecutor();
         case ROWLEVEL:
         default:
+          if (filterExpressionResolverTree.getFilterExpression() instanceof 
UnknownExpression) {
+            FilterExecuter filterExecuter =
+                ((UnknownExpression) 
filterExpressionResolverTree.getFilterExpression())
+                    .getFilterExecuter(filterExpressionResolverTree, 
segmentProperties);
+            if (filterExecuter != null) {
+              return filterExecuter;
+            }
+          }
           return new RowLevelFilterExecuterImpl(
               ((RowLevelFilterResolverImpl) filterExpressionResolverTree)
                   .getDimColEvaluatorInfoList(),
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index c4d3031..d0429e2 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -70,7 +70,7 @@ public class RowLevelFilterExecuterImpl implements 
FilterExecuter {
   /**
    * it has index at which given dimension is stored in file
    */
-  int[] dimensionChunkIndex;
+  protected int[] dimensionChunkIndex;
 
   /**
    * it has index at which given measure is stored in file.
diff --git 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
index ee9971a..953ba48 100644
--- 
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
+++ 
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
@@ -25,15 +25,20 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.ExpressionResult;
 import org.apache.carbondata.core.scan.expression.UnknownExpression;
 import 
org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import 
org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
 import org.apache.carbondata.core.util.CustomIndex;
+import 
org.apache.carbondata.geo.scan.filter.executor.PolygonFilterExecutorImpl;
 
 /**
  * InPolygon expression processor. It inputs the InPolygon string to the Geo 
implementation's
@@ -46,15 +51,16 @@ public class PolygonExpression extends UnknownExpression 
implements ConditionalE
   private CustomIndex<List<Long[]>> instance;
   private List<Long[]> ranges = new ArrayList<Long[]>();
   private ColumnExpression column;
-  private ExpressionResult trueExpRes;
-  private ExpressionResult falseExpRes;
+  private static final ExpressionResult trueExpRes =
+      new ExpressionResult(DataTypes.BOOLEAN, true);
+  private static final ExpressionResult falseExpRes =
+      new ExpressionResult(DataTypes.BOOLEAN, false);
+
 
   public PolygonExpression(String polygon, String columnName, CustomIndex 
indexInstance) {
     this.polygon = polygon;
     this.instance = indexInstance;
     this.column = new ColumnExpression(columnName, DataTypes.LONG);
-    this.trueExpRes = new ExpressionResult(DataTypes.BOOLEAN, true);
-    this.falseExpRes = new ExpressionResult(DataTypes.BOOLEAN, false);
   }
 
   private void validate(List<Long[]> ranges) {
@@ -79,6 +85,10 @@ public class PolygonExpression extends UnknownExpression 
implements ConditionalE
     }
   }
 
+  public List<Long[]> getRanges() {
+    return ranges;
+  }
+
   private boolean rangeBinarySearch(List<Long[]> ranges, long searchForNumber) 
{
     Long[] range;
     int low = 0, mid, high = ranges.size() - 1;
@@ -147,8 +157,6 @@ public class PolygonExpression extends UnknownExpression 
implements ConditionalE
     instance = (CustomIndex<List<Long[]>>) in.readObject();
     column = (ColumnExpression) in.readObject();
     ranges = new ArrayList<Long[]>();
-    trueExpRes = new ExpressionResult(DataTypes.BOOLEAN, true);
-    falseExpRes = new ExpressionResult(DataTypes.BOOLEAN, false);
   }
 
   @Override
@@ -170,4 +178,14 @@ public class PolygonExpression extends UnknownExpression 
implements ConditionalE
   public List<ExpressionResult> getLiterals() {
     return null;
   }
+
+  @Override
+  public FilterExecuter getFilterExecuter(FilterResolverIntf resolver,
+      SegmentProperties segmentProperties) {
+    assert (resolver instanceof RowLevelFilterResolverImpl);
+    RowLevelFilterResolverImpl rowLevelResolver = (RowLevelFilterResolverImpl) 
resolver;
+    return new 
PolygonFilterExecutorImpl(rowLevelResolver.getDimColEvaluatorInfoList(),
+        rowLevelResolver.getMsrColEvalutorInfoList(), 
rowLevelResolver.getFilterExpresion(),
+        rowLevelResolver.getTableIdentifier(), segmentProperties, null);
+  }
 }
diff --git 
a/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
 
b/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
new file mode 100644
index 0000000..094dbe8
--- /dev/null
+++ 
b/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo.scan.filter.executor;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import 
org.apache.carbondata.core.scan.filter.executer.RowLevelFilterExecuterImpl;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import 
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.geo.scan.expression.PolygonExpression;
+
+/**
+ * Polygon filter executor. Prunes Blocks and Blocklets based on the selected 
ranges of polygon.
+ */
+public class PolygonFilterExecutorImpl extends RowLevelFilterExecuterImpl {
+  public PolygonFilterExecutorImpl(List<DimColumnResolvedFilterInfo> 
dimColEvaluatorInfoList,
+      List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression 
exp,
+      AbsoluteTableIdentifier tableIdentifier, SegmentProperties 
segmentProperties,
+      Map<Integer, GenericQueryType> complexDimensionInfoMap) {
+    super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp, 
tableIdentifier, segmentProperties,
+        complexDimensionInfoMap);
+  }
+
+  private int getNearestRangeIndex(List<Long[]> ranges, long searchForNumber) {
+    Long[] range;
+    int low = 0, mid = 0, high = ranges.size() - 1;
+    while (low <= high) {
+      mid = low + ((high - low) / 2);
+      range = ranges.get(mid);
+      if (searchForNumber >= range[0]) {
+        if (searchForNumber <= range[1]) {
+          // Return the range index if the number is between min and max 
values of the range
+          return mid;
+        } else {
+          // Number is bigger than this range's min and max. Search on the 
right side of the range
+          low = mid + 1;
+        }
+      } else {
+        // Number is smaller than this range's min and max. Search on the left 
side of the range
+        high = mid - 1;
+      }
+    }
+    return mid;
+  }
+
+  /**
+   * Checks if the current block or blocklet needs to be scanned
+   * @param maxValue Max value in the current block or blocklet
+   * @param minValue Min value in te current block or blocklet
+   * @return True or False  True if current block or blocket needs to be 
scanned. Otherwise False.
+   */
+  private boolean isScanRequired(byte[] maxValue, byte[] minValue) {
+    PolygonExpression polygon = (PolygonExpression) exp;
+    List<Long[]> ranges = polygon.getRanges();
+    Long min =
+        (Long) 
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minValue, 
DataTypes.LONG);
+    Long max =
+        (Long) 
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(maxValue, 
DataTypes.LONG);
+
+    // Find the nearest possible range index for both the min and max values. 
If a value do not
+    // exist in the any of the range, get the preceding range index where it 
fits best
+    int startIndex = getNearestRangeIndex(ranges, min);
+    int endIndex = getNearestRangeIndex(ranges, max);
+    if (endIndex > startIndex) {
+       // Multiple ranges fall between min and max. Need to scan this block or 
blocklet
+      return true;
+    }
+    // Got same index for both min and max values.
+    Long[] oneRange = ranges.subList(startIndex, endIndex + 1).get(0);
+    if ((min >= oneRange[0] && min <= oneRange[1]) || (max >= oneRange[0] && 
max <= oneRange[1])) {
+      // Either min or max is within the range
+      return true;
+    }
+    // No range between min and max values. Scan can be avoided for this block 
or blocklet
+    return false;
+  }
+
+  @Override
+  public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue,
+      boolean[] isMinMaxSet) {
+    assert (exp instanceof PolygonExpression);
+    int dimIndex = dimensionChunkIndex[0];
+    BitSet bitSet = new BitSet(1);
+    if (isMinMaxSet[dimIndex] && isScanRequired(blockMaxValue[dimIndex], 
blockMinValue[dimIndex])) {
+      bitSet.set(0);
+    }
+    return bitSet;
+  }
+}
diff --git 
a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala 
b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index 0c72e5c..baf2f61 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -10,6 +10,13 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants
 class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach 
{
   val table1 = "geoTable1"
   val table2 = "geotable2"
+  val result = Seq(Row(116187332, 39979316),
+    Row(116362699, 39942444),
+    Row(116288955, 39999101),
+    Row(116325378, 39963129),
+    Row(116337069, 39951887),
+    Row(116285807, 40084087))
+
   override def beforeAll(): Unit = {
     drop()
   }
@@ -89,12 +96,7 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with 
BeforeAndAfterEach {
     checkAnswer(
       sql(s"select longitude, latitude from $table1 where 
IN_POLYGON('116.321011 40.123503, " +
           s"116.137676 39.947911, 116.560993 39.935276, 116.321011 
40.123503')"),
-      Seq(Row(116187332, 39979316),
-        Row(116362699, 39942444),
-        Row(116288955, 39999101),
-        Row(116325378, 39963129),
-        Row(116337069, 39951887),
-        Row(116285807, 40084087)))
+      result)
   }
 
   test("test insert into table select from another table") {
@@ -107,16 +109,10 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
     checkAnswer(
       sql(s"select longitude, latitude from $targetTable where 
IN_POLYGON('116.321011 40.123503, " +
           s"116.137676 39.947911, 116.560993 39.935276, 116.321011 
40.123503')"),
-      Seq(Row(116187332, 39979316),
-        Row(116362699, 39942444),
-        Row(116288955, 39999101),
-        Row(116325378, 39963129),
-        Row(116337069, 39951887),
-        Row(116285807, 40084087)))
+      result)
   }
 
-  test("test insert into table select from another table with target table 
sort scope as global")
-  {
+  test("test insert into table select from another table with target table 
sort scope as global") {
     val sourceTable = table1;
     val targetTable = table2;
     createTable(sourceTable)
@@ -126,16 +122,28 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
     checkAnswer(
       sql(s"select longitude, latitude from $targetTable where 
IN_POLYGON('116.321011 40.123503, " +
           s"116.137676 39.947911, 116.560993 39.935276, 116.321011 
40.123503')"),
-      Seq(Row(116187332, 39979316),
-        Row(116362699, 39942444),
-        Row(116288955, 39999101),
-        Row(116325378, 39963129),
-        Row(116337069, 39951887),
-        Row(116285807, 40084087)))
+      result)
+  }
+
+  test("test block pruning for polygon query") {
+    createTable()
+    sql(s"insert into $table1 select 1575428400000,116285807,40084087")
+    sql(s"insert into $table1 select 1575428400000,116372142,40129503")
+    sql(s"insert into $table1 select 1575428400000,116187332,39979316")
+    sql(s"insert into $table1 select 1575428400000,116337069,39951887")
+    sql(s"insert into $table1 select 1575428400000,116359102,40154684")
+    sql(s"insert into $table1 select 1575428400000,116736367,39970323")
+    sql(s"insert into $table1 select 1575428400000,116362699,39942444")
+    sql(s"insert into $table1 select 1575428400000,116325378,39963129")
+    sql(s"insert into $table1 select 1575428400000,116302895,39930753")
+    sql(s"insert into $table1 select 1575428400000,116288955,39999101")
+    val df = sql(s"select longitude, latitude from $table1 where 
IN_POLYGON('116.321011 " +
+                 s"40.123503, 116.137676 39.947911, 116.560993 39.935276, 
116.321011 40.123503')")
+    assert(df.rdd.getNumPartitions == 6)
+    checkAnswer(df, result)
   }
 
-  test("test polygon query on table partitioned by timevalue column")
-  {
+  test("test polygon query on table partitioned by timevalue column") {
     sql(s"""
            | CREATE TABLE $table1(
            | longitude LONG,
@@ -156,12 +164,7 @@ class GeoTest extends QueryTest with BeforeAndAfterAll 
with BeforeAndAfterEach {
     checkAnswer(
       sql(s"select longitude, latitude from $table1 where 
IN_POLYGON('116.321011 40.123503, " +
           s"116.137676 39.947911, 116.560993 39.935276, 116.321011 
40.123503')"),
-      Seq(Row(116187332, 39979316),
-        Row(116362699, 39942444),
-        Row(116288955, 39999101),
-        Row(116325378, 39963129),
-        Row(116337069, 39951887),
-        Row(116285807, 40084087)))
+      result)
   }
 
   override def afterEach(): Unit = {

Reply via email to