Repository: carbondata Updated Branches: refs/heads/branch-1.3 8b105a1e1 -> 9f73f0e60
[CARBONDATA-2134] Prevent implicit column filter list from getting serialized while submitting task to executor Problem In the current store blocklet pruning in driver and no further pruning takes place in the executor side. But still the implicit column filter list being sent to executor. As the size of list grows the cost of serializing and deserializing the list is increasing which can impact the query performance. Solution Remove the list from the filter expression before submitting the task to executor. This closes #1935 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9f73f0e6 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9f73f0e6 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9f73f0e6 Branch: refs/heads/branch-1.3 Commit: 9f73f0e60611c52278d2d475a89d42adebf32f60 Parents: 8b105a1 Author: m00258959 <manish.gu...@huawei.com> Authored: Mon Feb 5 17:10:18 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Fri Feb 9 13:03:47 2018 +0530 ---------------------------------------------------------------------- .../carbondata/core/scan/filter/FilterUtil.java | 35 ++++++++++++- .../core/scan/filter/FilterUtilTest.java | 48 ++++++++++++++++++ .../org/apache/carbondata/spark/util/Util.java | 19 +++++++ .../carbondata/spark/rdd/CarbonScanRDD.scala | 52 +++++++++++++++++++- 4 files changed, 152 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f73f0e6/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java index a08edc0..689da9f 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterUtil.java @@ -67,9 +67,12 @@ import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.expression.ExpressionResult; import org.apache.carbondata.core.scan.expression.LiteralExpression; +import org.apache.carbondata.core.scan.expression.conditional.InExpression; import org.apache.carbondata.core.scan.expression.conditional.ListExpression; import org.apache.carbondata.core.scan.expression.exception.FilterIllegalMemberException; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.expression.logical.AndExpression; +import org.apache.carbondata.core.scan.expression.logical.TrueExpression; import org.apache.carbondata.core.scan.filter.executer.AndFilterExecuterImpl; import org.apache.carbondata.core.scan.filter.executer.DimColumnExecuterFilterInfo; import org.apache.carbondata.core.scan.filter.executer.ExcludeColGroupFilterExecuterImpl; @@ -1824,4 +1827,34 @@ public final class FilterUtil { } return columnFilterInfo; } -} \ No newline at end of file + + /** + * This method will check for ColumnExpression with column name positionID and if found will + * replace the InExpression with true expression. This is done to stop serialization of List + * expression which is right children of InExpression as it can impact the query performance + * as the size of list grows bigger. + * + * @param expression + */ + public static void removeInExpressionNodeWithPositionIdColumn(Expression expression) { + ExpressionType filterExpressionType = expression.getFilterExpressionType(); + if (ExpressionType.AND == filterExpressionType) { + Expression rightExpression = ((AndExpression) expression).getRight(); + if (rightExpression instanceof InExpression) { + List<Expression> children = rightExpression.getChildren(); + if (null != children && !children.isEmpty()) { + Expression childExpression = children.get(0); + // check for the positionId as the column name in ColumnExpression + if (childExpression instanceof ColumnExpression && ((ColumnExpression) childExpression) + .getColumnName().equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)) { + // Remove the right expression node and point the expression to left node expression + expression + .findAndSetChild(((AndExpression) expression).getRight(), new TrueExpression(null)); + LOGGER.info("In expression removed from the filter expression list to prevent it from" + + " serializing on executor"); + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f73f0e6/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java index 89b3122..565da04 100644 --- a/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java @@ -35,8 +35,11 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.Expression; import org.apache.carbondata.core.scan.expression.LiteralExpression; +import org.apache.carbondata.core.scan.expression.conditional.InExpression; import org.apache.carbondata.core.scan.expression.conditional.ListExpression; import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException; +import org.apache.carbondata.core.scan.expression.logical.AndExpression; +import org.apache.carbondata.core.scan.expression.logical.TrueExpression; import org.apache.carbondata.core.scan.filter.intf.RowImpl; import org.apache.carbondata.core.util.BitSetGroup; @@ -399,4 +402,49 @@ public class FilterUtilTest extends AbstractDictionaryCacheTest { FilterUtil.createBitSetGroupWithDefaultValue(15, 448200, true); assertTrue(bitSetGroupWithDefaultValue.getNumberOfPages() == 15); } + + @Test public void testRemoveInExpressionNodeWithPositionIdColumn() { + List<Expression> children = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + // create literal expression + LiteralExpression literalExpression = + new LiteralExpression("0/1/0-0_batchno0-0-1517808273200/0", DataTypes.STRING); + children.add(literalExpression); + // create list expression + ListExpression listExpression = new ListExpression(children); + // create column expression with column name as positionId + ColumnExpression columnExpression = + new ColumnExpression(CarbonCommonConstants.POSITION_ID, DataTypes.STRING); + // create InExpression as right node + InExpression inExpression = new InExpression(columnExpression, listExpression); + // create a dummy true expression as left node + TrueExpression trueExpression = new TrueExpression(null); + // create and expression as the root node + Expression expression = new AndExpression(trueExpression, inExpression); + // test remove expression method + FilterUtil.removeInExpressionNodeWithPositionIdColumn(expression); + // after removing the right node instance of right node should be of true expression + assert (((AndExpression) expression).getRight() instanceof TrueExpression); + } + + @Test public void testRemoveInExpressionNodeWithDifferentColumn() { + List<Expression> children = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + // create literal expression + LiteralExpression literalExpression = + new LiteralExpression("testName", DataTypes.STRING); + children.add(literalExpression); + // create list expression + ListExpression listExpression = new ListExpression(children); + // create column expression with column name as positionId + ColumnExpression columnExpression = new ColumnExpression("name", DataTypes.STRING); + // create InExpression as right node + InExpression inExpression = new InExpression(columnExpression, listExpression); + // create a dummy true expression as left node + TrueExpression trueExpression = new TrueExpression(null); + // create and expression as the root node + Expression expression = new AndExpression(trueExpression, inExpression); + // test remove expression method + FilterUtil.removeInExpressionNodeWithPositionIdColumn(expression); + // after removing the right node instance of right node should be of true expression + assert (((AndExpression) expression).getRight() instanceof InExpression); + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f73f0e6/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java index 8c14cd3..cd2b81c 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/Util.java @@ -17,6 +17,10 @@ package org.apache.carbondata.spark.util; +import java.util.List; + +import org.apache.carbondata.hadoop.CarbonInputSplit; + import org.apache.spark.SparkConf; import org.apache.spark.util.Utils; @@ -27,4 +31,19 @@ public class Util { public static String[] getConfiguredLocalDirs(SparkConf conf) { return Utils.getConfiguredLocalDirs(conf); } + + /** + * Method to check whether there exists any block which does not contain the blocklet info + * + * @param splitList + * @return + */ + public static boolean isBlockWithoutBlockletInfoExists(List<CarbonInputSplit> splitList) { + for (CarbonInputSplit inputSplit : splitList) { + if (null == inputSplit.getDetailInfo().getBlockletInfo()) { + return true; + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/9f73f0e6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 102c6c8..e554a58 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.Random +import scala.util.control.Breaks.{break, breakable} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf @@ -42,6 +43,7 @@ import org.apache.carbondata.core.datastore.block.Distributable import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.scan.expression.Expression +import org.apache.carbondata.core.scan.filter.FilterUtil import org.apache.carbondata.core.scan.model.QueryModel import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstants, QueryStatisticsRecorder} import org.apache.carbondata.core.statusmanager.FileFormat @@ -51,7 +53,7 @@ import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.InitInputMetrics -import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl +import org.apache.carbondata.spark.util.{SparkDataTypeConverterImpl, Util} /** * This RDD is used to perform query on CarbonData file. Before sending tasks to scan @@ -109,6 +111,8 @@ class CarbonScanRDD( } } val batchPartitions = distributeColumnarSplits(columnarSplits) + // check and remove InExpression from filterExpression + checkAndRemoveInExpressinFromFilterExpression(format, batchPartitions) if (streamSplits.isEmpty) { batchPartitions.toArray } else { @@ -471,6 +475,52 @@ class CarbonScanRDD( } /** + * This method will check and remove InExpression from filterExpression to prevent the List + * Expression values from serializing and deserializing on executor + * + * @param format + * @param identifiedPartitions + */ + private def checkAndRemoveInExpressinFromFilterExpression( + format: CarbonTableInputFormat[Object], + identifiedPartitions: mutable.Buffer[Partition]) = { + if (null != filterExpression) { + if (identifiedPartitions.nonEmpty && + !checkForBlockWithoutBlockletInfo(identifiedPartitions)) { + FilterUtil.removeInExpressionNodeWithPositionIdColumn(filterExpression) + } + } + } + + /** + * This method will check for presence of any block from old store (version 1.1). If any of the + * blocks identified does not contain the blocklet info that means that block is from old store + * + * @param identifiedPartitions + * @return + */ + private def checkForBlockWithoutBlockletInfo( + identifiedPartitions: mutable.Buffer[Partition]): Boolean = { + var isBlockWithoutBlockletInfoPresent = false + breakable { + identifiedPartitions.foreach { value => + val inputSplit = value.asInstanceOf[CarbonSparkPartition].split.value + val splitList = if (inputSplit.isInstanceOf[CarbonMultiBlockSplit]) { + inputSplit.asInstanceOf[CarbonMultiBlockSplit].getAllSplits + } else { + new java.util.ArrayList().add(inputSplit.asInstanceOf[CarbonInputSplit]) + }.asInstanceOf[java.util.List[CarbonInputSplit]] + // check for block from old store (version 1.1 and below) + if (Util.isBlockWithoutBlockletInfoExists(splitList)) { + isBlockWithoutBlockletInfoPresent = true + break + } + } + } + isBlockWithoutBlockletInfoPresent + } + + /** * Get the preferred locations where to launch this task. */ override def getPreferredLocations(split: Partition): Seq[String] = {