This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 779320b [CARBONDATA-3832]Added block and blocket pruning for the
polygon expression processing
779320b is described below
commit 779320b7b70a9af7ee74ff7e25adfc1cc5ebce43
Author: Venu Reddy <[email protected]>
AuthorDate: Sun Mar 22 21:56:45 2020 +0530
[CARBONDATA-3832]Added block and blocket pruning for the polygon expression
processing
Why is this PR needed?
At present, carbon doesn't do block/blocklet pruning for polygon fileter
queries.
It does rowlevel filtering at carbon layer and returns result. With this
approach,
all the carbon files are scanned irrespective of the where there are any
matching
rows in the block. It also has spark overhead to launch many jobs and tasks
to process them.
Thus affects the overall performance of polygon query.
What changes were proposed in this PR?
Leverage the existing block pruning mechanism in the carbon and avoided the
unwanted
blocks with block pruning. Thus reduce the number of splits. And at the
executor side,
used blocklet pruning and reduced the number of blocklets to be read and
scanned.
This closes #3772
---
.../core/scan/expression/UnknownExpression.java | 8 ++
.../carbondata/core/scan/filter/FilterUtil.java | 9 ++
.../executer/RowLevelFilterExecuterImpl.java | 2 +-
.../geo/scan/expression/PolygonExpression.java | 30 ++++--
.../filter/executor/PolygonFilterExecutorImpl.java | 112 +++++++++++++++++++++
.../scala/org/apache/carbondata/geo/GeoTest.scala | 59 +++++------
6 files changed, 185 insertions(+), 35 deletions(-)
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/expression/UnknownExpression.java
b/core/src/main/java/org/apache/carbondata/core/scan/expression/UnknownExpression.java
index 2cb26a6..cbea664 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/expression/UnknownExpression.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/expression/UnknownExpression.java
@@ -19,8 +19,16 @@ package org.apache.carbondata.core.scan.expression;
import java.util.List;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+
public abstract class UnknownExpression extends Expression {
public abstract List<ColumnExpression> getAllColumnList();
+ public FilterExecuter getFilterExecuter(FilterResolverIntf
filterResolverIntf,
+ SegmentProperties segmentProperties) {
+ return null;
+ }
}
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
index 748bafd..6f121ca 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java
@@ -49,6 +49,7 @@ import
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.UnknownExpression;
import org.apache.carbondata.core.scan.expression.conditional.*;
import
org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException;
import
org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
@@ -188,6 +189,14 @@ public final class FilterUtil {
return new FalseFilterExecutor();
case ROWLEVEL:
default:
+ if (filterExpressionResolverTree.getFilterExpression() instanceof
UnknownExpression) {
+ FilterExecuter filterExecuter =
+ ((UnknownExpression)
filterExpressionResolverTree.getFilterExpression())
+ .getFilterExecuter(filterExpressionResolverTree,
segmentProperties);
+ if (filterExecuter != null) {
+ return filterExecuter;
+ }
+ }
return new RowLevelFilterExecuterImpl(
((RowLevelFilterResolverImpl) filterExpressionResolverTree)
.getDimColEvaluatorInfoList(),
diff --git
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
index c4d3031..d0429e2 100644
---
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
+++
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelFilterExecuterImpl.java
@@ -70,7 +70,7 @@ public class RowLevelFilterExecuterImpl implements
FilterExecuter {
/**
* it has index at which given dimension is stored in file
*/
- int[] dimensionChunkIndex;
+ protected int[] dimensionChunkIndex;
/**
* it has index at which given measure is stored in file.
diff --git
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
index ee9971a..953ba48 100644
---
a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
+++
b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
@@ -25,15 +25,20 @@ import java.util.Arrays;
import java.util.List;
import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.scan.expression.ColumnExpression;
import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.expression.ExpressionResult;
import org.apache.carbondata.core.scan.expression.UnknownExpression;
import
org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecuter;
import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import
org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
import org.apache.carbondata.core.util.CustomIndex;
+import
org.apache.carbondata.geo.scan.filter.executor.PolygonFilterExecutorImpl;
/**
* InPolygon expression processor. It inputs the InPolygon string to the Geo
implementation's
@@ -46,15 +51,16 @@ public class PolygonExpression extends UnknownExpression
implements ConditionalE
private CustomIndex<List<Long[]>> instance;
private List<Long[]> ranges = new ArrayList<Long[]>();
private ColumnExpression column;
- private ExpressionResult trueExpRes;
- private ExpressionResult falseExpRes;
+ private static final ExpressionResult trueExpRes =
+ new ExpressionResult(DataTypes.BOOLEAN, true);
+ private static final ExpressionResult falseExpRes =
+ new ExpressionResult(DataTypes.BOOLEAN, false);
+
public PolygonExpression(String polygon, String columnName, CustomIndex
indexInstance) {
this.polygon = polygon;
this.instance = indexInstance;
this.column = new ColumnExpression(columnName, DataTypes.LONG);
- this.trueExpRes = new ExpressionResult(DataTypes.BOOLEAN, true);
- this.falseExpRes = new ExpressionResult(DataTypes.BOOLEAN, false);
}
private void validate(List<Long[]> ranges) {
@@ -79,6 +85,10 @@ public class PolygonExpression extends UnknownExpression
implements ConditionalE
}
}
+ public List<Long[]> getRanges() {
+ return ranges;
+ }
+
private boolean rangeBinarySearch(List<Long[]> ranges, long searchForNumber)
{
Long[] range;
int low = 0, mid, high = ranges.size() - 1;
@@ -147,8 +157,6 @@ public class PolygonExpression extends UnknownExpression
implements ConditionalE
instance = (CustomIndex<List<Long[]>>) in.readObject();
column = (ColumnExpression) in.readObject();
ranges = new ArrayList<Long[]>();
- trueExpRes = new ExpressionResult(DataTypes.BOOLEAN, true);
- falseExpRes = new ExpressionResult(DataTypes.BOOLEAN, false);
}
@Override
@@ -170,4 +178,14 @@ public class PolygonExpression extends UnknownExpression
implements ConditionalE
public List<ExpressionResult> getLiterals() {
return null;
}
+
+ @Override
+ public FilterExecuter getFilterExecuter(FilterResolverIntf resolver,
+ SegmentProperties segmentProperties) {
+ assert (resolver instanceof RowLevelFilterResolverImpl);
+ RowLevelFilterResolverImpl rowLevelResolver = (RowLevelFilterResolverImpl)
resolver;
+ return new
PolygonFilterExecutorImpl(rowLevelResolver.getDimColEvaluatorInfoList(),
+ rowLevelResolver.getMsrColEvalutorInfoList(),
rowLevelResolver.getFilterExpresion(),
+ rowLevelResolver.getTableIdentifier(), segmentProperties, null);
+ }
}
diff --git
a/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
b/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
new file mode 100644
index 0000000..094dbe8
--- /dev/null
+++
b/geo/src/main/java/org/apache/carbondata/geo/scan/filter/executor/PolygonFilterExecutorImpl.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo.scan.filter.executor;
+
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.GenericQueryType;
+import
org.apache.carbondata.core.scan.filter.executer.RowLevelFilterExecuterImpl;
+import
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;
+import
org.apache.carbondata.core.scan.filter.resolver.resolverinfo.MeasureColumnResolvedFilterInfo;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.geo.scan.expression.PolygonExpression;
+
+/**
+ * Polygon filter executor. Prunes Blocks and Blocklets based on the selected
ranges of polygon.
+ */
+public class PolygonFilterExecutorImpl extends RowLevelFilterExecuterImpl {
+ public PolygonFilterExecutorImpl(List<DimColumnResolvedFilterInfo>
dimColEvaluatorInfoList,
+ List<MeasureColumnResolvedFilterInfo> msrColEvalutorInfoList, Expression
exp,
+ AbsoluteTableIdentifier tableIdentifier, SegmentProperties
segmentProperties,
+ Map<Integer, GenericQueryType> complexDimensionInfoMap) {
+ super(dimColEvaluatorInfoList, msrColEvalutorInfoList, exp,
tableIdentifier, segmentProperties,
+ complexDimensionInfoMap);
+ }
+
+ private int getNearestRangeIndex(List<Long[]> ranges, long searchForNumber) {
+ Long[] range;
+ int low = 0, mid = 0, high = ranges.size() - 1;
+ while (low <= high) {
+ mid = low + ((high - low) / 2);
+ range = ranges.get(mid);
+ if (searchForNumber >= range[0]) {
+ if (searchForNumber <= range[1]) {
+ // Return the range index if the number is between min and max
values of the range
+ return mid;
+ } else {
+ // Number is bigger than this range's min and max. Search on the
right side of the range
+ low = mid + 1;
+ }
+ } else {
+ // Number is smaller than this range's min and max. Search on the left
side of the range
+ high = mid - 1;
+ }
+ }
+ return mid;
+ }
+
+ /**
+ * Checks if the current block or blocklet needs to be scanned
+ * @param maxValue Max value in the current block or blocklet
+ * @param minValue Min value in te current block or blocklet
+ * @return True or False True if current block or blocket needs to be
scanned. Otherwise False.
+ */
+ private boolean isScanRequired(byte[] maxValue, byte[] minValue) {
+ PolygonExpression polygon = (PolygonExpression) exp;
+ List<Long[]> ranges = polygon.getRanges();
+ Long min =
+ (Long)
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(minValue,
DataTypes.LONG);
+ Long max =
+ (Long)
DataTypeUtil.getDataBasedOnDataTypeForNoDictionaryColumn(maxValue,
DataTypes.LONG);
+
+ // Find the nearest possible range index for both the min and max values.
If a value do not
+ // exist in the any of the range, get the preceding range index where it
fits best
+ int startIndex = getNearestRangeIndex(ranges, min);
+ int endIndex = getNearestRangeIndex(ranges, max);
+ if (endIndex > startIndex) {
+ // Multiple ranges fall between min and max. Need to scan this block or
blocklet
+ return true;
+ }
+ // Got same index for both min and max values.
+ Long[] oneRange = ranges.subList(startIndex, endIndex + 1).get(0);
+ if ((min >= oneRange[0] && min <= oneRange[1]) || (max >= oneRange[0] &&
max <= oneRange[1])) {
+ // Either min or max is within the range
+ return true;
+ }
+ // No range between min and max values. Scan can be avoided for this block
or blocklet
+ return false;
+ }
+
+ @Override
+ public BitSet isScanRequired(byte[][] blockMaxValue, byte[][] blockMinValue,
+ boolean[] isMinMaxSet) {
+ assert (exp instanceof PolygonExpression);
+ int dimIndex = dimensionChunkIndex[0];
+ BitSet bitSet = new BitSet(1);
+ if (isMinMaxSet[dimIndex] && isScanRequired(blockMaxValue[dimIndex],
blockMinValue[dimIndex])) {
+ bitSet.set(0);
+ }
+ return bitSet;
+ }
+}
diff --git
a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index 0c72e5c..baf2f61 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -10,6 +10,13 @@ import
org.apache.carbondata.core.constants.CarbonCommonConstants
class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach
{
val table1 = "geoTable1"
val table2 = "geotable2"
+ val result = Seq(Row(116187332, 39979316),
+ Row(116362699, 39942444),
+ Row(116288955, 39999101),
+ Row(116325378, 39963129),
+ Row(116337069, 39951887),
+ Row(116285807, 40084087))
+
override def beforeAll(): Unit = {
drop()
}
@@ -89,12 +96,7 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with
BeforeAndAfterEach {
checkAnswer(
sql(s"select longitude, latitude from $table1 where
IN_POLYGON('116.321011 40.123503, " +
s"116.137676 39.947911, 116.560993 39.935276, 116.321011
40.123503')"),
- Seq(Row(116187332, 39979316),
- Row(116362699, 39942444),
- Row(116288955, 39999101),
- Row(116325378, 39963129),
- Row(116337069, 39951887),
- Row(116285807, 40084087)))
+ result)
}
test("test insert into table select from another table") {
@@ -107,16 +109,10 @@ class GeoTest extends QueryTest with BeforeAndAfterAll
with BeforeAndAfterEach {
checkAnswer(
sql(s"select longitude, latitude from $targetTable where
IN_POLYGON('116.321011 40.123503, " +
s"116.137676 39.947911, 116.560993 39.935276, 116.321011
40.123503')"),
- Seq(Row(116187332, 39979316),
- Row(116362699, 39942444),
- Row(116288955, 39999101),
- Row(116325378, 39963129),
- Row(116337069, 39951887),
- Row(116285807, 40084087)))
+ result)
}
- test("test insert into table select from another table with target table
sort scope as global")
- {
+ test("test insert into table select from another table with target table
sort scope as global") {
val sourceTable = table1;
val targetTable = table2;
createTable(sourceTable)
@@ -126,16 +122,28 @@ class GeoTest extends QueryTest with BeforeAndAfterAll
with BeforeAndAfterEach {
checkAnswer(
sql(s"select longitude, latitude from $targetTable where
IN_POLYGON('116.321011 40.123503, " +
s"116.137676 39.947911, 116.560993 39.935276, 116.321011
40.123503')"),
- Seq(Row(116187332, 39979316),
- Row(116362699, 39942444),
- Row(116288955, 39999101),
- Row(116325378, 39963129),
- Row(116337069, 39951887),
- Row(116285807, 40084087)))
+ result)
+ }
+
+ test("test block pruning for polygon query") {
+ createTable()
+ sql(s"insert into $table1 select 1575428400000,116285807,40084087")
+ sql(s"insert into $table1 select 1575428400000,116372142,40129503")
+ sql(s"insert into $table1 select 1575428400000,116187332,39979316")
+ sql(s"insert into $table1 select 1575428400000,116337069,39951887")
+ sql(s"insert into $table1 select 1575428400000,116359102,40154684")
+ sql(s"insert into $table1 select 1575428400000,116736367,39970323")
+ sql(s"insert into $table1 select 1575428400000,116362699,39942444")
+ sql(s"insert into $table1 select 1575428400000,116325378,39963129")
+ sql(s"insert into $table1 select 1575428400000,116302895,39930753")
+ sql(s"insert into $table1 select 1575428400000,116288955,39999101")
+ val df = sql(s"select longitude, latitude from $table1 where
IN_POLYGON('116.321011 " +
+ s"40.123503, 116.137676 39.947911, 116.560993 39.935276,
116.321011 40.123503')")
+ assert(df.rdd.getNumPartitions == 6)
+ checkAnswer(df, result)
}
- test("test polygon query on table partitioned by timevalue column")
- {
+ test("test polygon query on table partitioned by timevalue column") {
sql(s"""
| CREATE TABLE $table1(
| longitude LONG,
@@ -156,12 +164,7 @@ class GeoTest extends QueryTest with BeforeAndAfterAll
with BeforeAndAfterEach {
checkAnswer(
sql(s"select longitude, latitude from $table1 where
IN_POLYGON('116.321011 40.123503, " +
s"116.137676 39.947911, 116.560993 39.935276, 116.321011
40.123503')"),
- Seq(Row(116187332, 39979316),
- Row(116362699, 39942444),
- Row(116288955, 39999101),
- Row(116325378, 39963129),
- Row(116337069, 39951887),
- Row(116285807, 40084087)))
+ result)
}
override def afterEach(): Unit = {