This is an automated email from the ASF dual-hosted git repository. ajantha pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new 8ff487f [CARBONDATA-3548] Polygon expression processing using unknown expression and filtering performance improvement 8ff487f is described below commit 8ff487f3b3d6e0a0d561f0cc763db14aff5a51e9 Author: Venu Reddy <venugopalred...@huawei.com> AuthorDate: Wed Feb 5 15:39:57 2020 +0530 [CARBONDATA-3548] Polygon expression processing using unknown expression and filtering performance improvement Why is this PR needed? This PR improves the query processing performance of in_polygon UDF. What changes were proposed in this PR? At present, PolygonExpression processing leverages the existing InExpression. PolygonExpression internally creates a InExpression as a child to it. InExpression is constructed/build from the result of Quad tree algorithm. Algorithm returns the list of ranges(with each range having min and max Id for that range). And this list is a sorted one. InExpression constitute of 2 childs. One child is a columnExpression(for geohash column) and the other is a ListExpression( with List of LiternalExpressions. One LiteralExpression for each Id returned from algo). Problems associated with this approach: We expand the list of ranges(with each range having minand max) to all individual Ids. And create LiteralExpression for each Id. Since we can have large ranges(and the numerous ranges), it consumes huge amount of memory in processing. Due to same reason, it slows does the filter execution. Modifications with this PR: Instead we can use UnknownExpression with RowLevelFilterResolverImpl and RowLevelFilterExecuterImpl processing. And override evaluate() method to do the binary search on the list of ranges directly. This will significanly improve the polygon filter query performance. And Polygon filter expression type is not required anymore at Carbon-Core module. Does this PR introduce any user interface change? No. Is any new testcase added? Yes. Added an end to end test case This closes #3616 --- .../scan/filter/FilterExpressionProcessor.java | 2 - .../core/scan/filter/intf/ExpressionType.java | 3 +- .../geo/scan/expression/PolygonExpression.java | 97 ++++++++++++++++------ .../scala/org/apache/carbondata/geo/GeoTest.scala | 36 ++++++-- 4 files changed, 98 insertions(+), 40 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java index 4d80fbc..e5405ce 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java @@ -168,8 +168,6 @@ public class FilterExpressionProcessor implements FilterProcessor { case TRUE: return getFilterResolverBasedOnExpressionType(ExpressionType.TRUE, false, expressionTree, tableIdentifier, expressionTree); - case POLYGON: - return createFilterResolverTree(expressionTree.getChildren().get(0), tableIdentifier); default: return getFilterResolverBasedOnExpressionType(ExpressionType.UNKNOWN, false, expressionTree, tableIdentifier, expressionTree); diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java index 5614dda..a89a84f 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java @@ -44,6 +44,5 @@ public enum ExpressionType { ENDSWITH, CONTAINSWITH, TEXT_MATCH, - IMPLICIT, - POLYGON + IMPLICIT } diff --git a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java index 433866f..5f27b9a 100644 --- a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java +++ b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.carbondata.common.annotations.InterfaceAudience; @@ -28,9 +29,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.expression.ExpressionResult; -import org.apache.carbondata.core.scan.expression.LiteralExpression; -import org.apache.carbondata.core.scan.expression.conditional.InExpression; -import org.apache.carbondata.core.scan.expression.conditional.ListExpression; +import org.apache.carbondata.core.scan.expression.UnknownExpression; +import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression; import org.apache.carbondata.core.scan.filter.intf.ExpressionType; import org.apache.carbondata.core.scan.filter.intf.RowIntf; import org.apache.carbondata.core.util.CustomIndex; @@ -41,64 +41,85 @@ import org.apache.carbondata.core.util.CustomIndex; * InExpression with list of all the IDs present in those list of ranges. */ @InterfaceAudience.Internal -public class PolygonExpression extends Expression { +public class PolygonExpression extends UnknownExpression implements ConditionalExpression { private String polygon; - private String columnName; private CustomIndex<List<Long[]>> handler; - private List<Expression> children = new ArrayList<Expression>(); + private List<Long[]> ranges = new ArrayList<Long[]>(); + private ColumnExpression column; + private ExpressionResult trueExpRes; + private ExpressionResult falseExpRes; public PolygonExpression(String polygon, String columnName, CustomIndex handler) { this.polygon = polygon; this.handler = handler; - this.columnName = columnName; + this.column = new ColumnExpression(columnName, DataTypes.LONG); + this.trueExpRes = new ExpressionResult(DataTypes.BOOLEAN, true); + this.falseExpRes = new ExpressionResult(DataTypes.BOOLEAN, false); } - private void buildExpression(List<Long[]> ranges) { - // Build InExpression with list of all the values present in the ranges - List<Expression> inList = new ArrayList<Expression>(); + private void validate(List<Long[]> ranges) { + // Validate the ranges for (Long[] range : ranges) { if (range.length != 2) { throw new RuntimeException("Handler query must return list of ranges with each range " + "containing minimum and maximum values"); } - for (long i = range[0]; i <= range[1]; i++) { - inList.add(new LiteralExpression(i, DataTypes.LONG)); - } } - children.add(new InExpression(new ColumnExpression(columnName, DataTypes.LONG), - new ListExpression(inList))); } /** - * This method builds InExpression with list of all the values present in the list of ranges of - * IDs. + * This method calls the query processor and gets the list of ranges of IDs. */ private void processExpression() { - List<Long[]> ranges; try { ranges = handler.query(polygon); + validate(ranges); } catch (Exception e) { throw new RuntimeException(e); } - buildExpression(ranges); + } + + private boolean rangeBinarySearch(List<Long[]> ranges, long searchForNumber) { + Long[] range; + int low = 0, mid, high = ranges.size() - 1; + while (low <= high) { + mid = low + ((high - low) / 2); + range = ranges.get(mid); + if (searchForNumber >= range[0]) { + if (searchForNumber <= range[1]) { + // Return true if the number is between min and max values of the range + return true; + } else { + // Number is bigger than this range's min and max. Search on the right side of the range + low = mid + 1; + } + } else { + // Number is smaller than this range's min and max. Search on the left side of the range + high = mid - 1; + } + } + return false; } @Override public ExpressionResult evaluate(RowIntf value) { - throw new UnsupportedOperationException("Operation not supported for Polygon expression"); + if (rangeBinarySearch(ranges, (Long) value.getVal(0))) { + return trueExpRes; + } + return falseExpRes; } @Override public ExpressionType getFilterExpressionType() { - return ExpressionType.POLYGON; + return ExpressionType.UNKNOWN; } @Override public List<Expression> getChildren() { - if (children.isEmpty()) { + if (ranges.isEmpty()) { processExpression(); } - return children; + return super.getChildren(); } @Override @@ -107,7 +128,7 @@ public class PolygonExpression extends Expression { @Override public String getString() { - return polygon; + return getStatement(); } @Override @@ -117,14 +138,36 @@ public class PolygonExpression extends Expression { private void writeObject(ObjectOutputStream out) throws IOException { out.writeObject(polygon); - out.writeObject(columnName); out.writeObject(handler); + out.writeObject(column); } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { polygon = (String) in.readObject(); - columnName = (String) in.readObject(); handler = (CustomIndex<List<Long[]>>) in.readObject(); - children = new ArrayList<Expression>(); + column = (ColumnExpression) in.readObject(); + ranges = new ArrayList<Long[]>(); + trueExpRes = new ExpressionResult(DataTypes.BOOLEAN, true); + falseExpRes = new ExpressionResult(DataTypes.BOOLEAN, false); + } + + @Override + public List<ColumnExpression> getAllColumnList() { + return new ArrayList<ColumnExpression>(Arrays.asList(column)); + } + + @Override + public List<ColumnExpression> getColumnList() { + return getAllColumnList(); + } + + @Override + public boolean isSingleColumn() { + return true; + } + + @Override + public List<ExpressionResult> getLiterals() { + return null; } } diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/geo/GeoTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/geo/GeoTest.scala index 7f05cc8..81c86ac 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/geo/GeoTest.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/geo/GeoTest.scala @@ -1,12 +1,13 @@ package org.apache.carbondata.geo +import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants -class GeoTest extends QueryTest with BeforeAndAfterAll { +class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfter { override def beforeAll(): Unit = { drop() } @@ -80,6 +81,23 @@ class GeoTest extends QueryTest with BeforeAndAfterAll { } } + test("test polygon query") { + createTable() + loadData() + checkAnswer( + sql(s"select longitude, latitude from geotable where IN_POLYGON('116.321011 40.123503, " + + s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"), + Seq(Row(116187332, 39979316), + Row(116362699, 39942444), + Row(116288955, 39999101), + Row(116325378, 39963129), + Row(116337069, 39951887), + Row(116285807, 40084087))) + } + + after { + drop() + } override def afterAll(): Unit = { drop() } @@ -98,13 +116,13 @@ class GeoTest extends QueryTest with BeforeAndAfterAll { | TBLPROPERTIES ('INDEX_HANDLER'='mygeohash', | 'INDEX_HANDLER.mygeohash.type'='geohash', | 'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude', - | 'INDEX_HANDLER.mygeohash.originLatitude'='1', - | 'INDEX_HANDLER.mygeohash.gridSize'='2', - | 'INDEX_HANDLER.mygeohash.minLongitude'='1', - | 'INDEX_HANDLER.mygeohash.maxLongitude'='4', - | 'INDEX_HANDLER.mygeohash.minLatitude'='1', - | 'INDEX_HANDLER.mygeohash.maxLatitude'='4', - | 'INDEX_HANDLER.mygeohash.conversionRatio'='1') + | 'INDEX_HANDLER.mygeohash.originLatitude'='39.832277', + | 'INDEX_HANDLER.mygeohash.gridSize'='50', + | 'INDEX_HANDLER.mygeohash.minLongitude'='115.811865', + | 'INDEX_HANDLER.mygeohash.maxLongitude'='116.782233', + | 'INDEX_HANDLER.mygeohash.minLatitude'='39.832277', + | 'INDEX_HANDLER.mygeohash.maxLatitude'='40.225281', + | 'INDEX_HANDLER.mygeohash.conversionRatio'='1000000') """.stripMargin) }