Repository: carbondata Updated Branches: refs/heads/master b8a02f391 -> 28c94183b
[CARBONDATA-1854] add support for implicit column filter This closes #1644 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/28c94183 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/28c94183 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/28c94183 Branch: refs/heads/master Commit: 28c94183bf5e03f6af05a270b96c30529233332d Parents: b8a02f3 Author: rahulforallp <[email protected]> Authored: Tue Dec 12 14:49:27 2017 +0530 Committer: ravipesala <[email protected]> Committed: Wed Dec 20 08:18:52 2017 +0530 ---------------------------------------------------------------------- .../scan/filter/FilterExpressionProcessor.java | 11 ++++ .../filter/FilterExpressionProcessorTest.java | 68 ++++++++++++++++++++ .../load/DataLoadProcessorStepOnSpark.scala | 5 ++ .../spark/rdd/CarbonDataRDDFactory.scala | 5 ++ 4 files changed, 89 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/28c94183/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java index 6c804d7..5a1b7df 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java @@ -403,6 +403,17 @@ public class FilterExpressionProcessor implements FilterProcessor { return new TrueConditionalResolverImpl(expression, false, false, tableIdentifier); case EQUALS: currentCondExpression = (BinaryConditionalExpression) expression; + // check for implicit column in the expression + if (currentCondExpression instanceof InExpression) { + CarbonColumn carbonColumn = + currentCondExpression.getColumnList().get(0).getCarbonColumn(); + if (carbonColumn.hasEncoding(Encoding.IMPLICIT)) { + return new ConditionalFilterResolverImpl(expression, isExpressionResolve, true, + tableIdentifier, + currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure()); + } + } + CarbonColumn column = currentCondExpression.getColumnList().get(0).getCarbonColumn(); if (currentCondExpression.isSingleColumn() && ! column.getDataType().isComplexType()) { if (column.isMeasure()) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/28c94183/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessorTest.java b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessorTest.java new file mode 100644 index 0000000..1f244a3 --- /dev/null +++ b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessorTest.java @@ -0,0 +1,68 @@ +package org.apache.carbondata.core.scan.filter; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest; +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; +import org.apache.carbondata.core.metadata.CarbonTableIdentifier; +import org.apache.carbondata.core.metadata.datatype.DataTypes; +import org.apache.carbondata.core.metadata.encoder.Encoding; +import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; +import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.scan.expression.ColumnExpression; +import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.LiteralExpression; +import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression; +import org.apache.carbondata.core.scan.expression.conditional.InExpression; +import org.apache.carbondata.core.scan.filter.intf.ExpressionType; +import org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl; + +import org.junit.Before; +import org.junit.Test; + +public class FilterExpressionProcessorTest extends AbstractDictionaryCacheTest { + + private ColumnSchema columnSchema; + + @Before public void setUp() throws Exception { + init(); + this.databaseName = props.getProperty("database", "testSchema"); + this.tableName = props.getProperty("tableName", "carbon"); + this.carbonStorePath = props.getProperty("storePath", "carbonStore"); + carbonTableIdentifier = + new CarbonTableIdentifier(databaseName, tableName, UUID.randomUUID().toString()); + this.carbonStorePath = props.getProperty("storePath", "carbonStore"); + columnSchema = new ColumnSchema(); + columnSchema.setColumnar(true); + columnSchema.setColumnName("IMEI"); + columnSchema.setColumnUniqueId(UUID.randomUUID().toString()); + columnSchema.setDataType(DataTypes.STRING); + columnSchema.setDimensionColumn(true); + List<Encoding> encodingList = new ArrayList<>(); + encodingList.add(Encoding.IMPLICIT); + columnSchema.setEncodingList(encodingList); + } + + @Test public void testGetFilterResolverBasedOnExpressionType() + throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { + CarbonColumn carbonColumn = new CarbonColumn(columnSchema, 0, 0); + ColumnExpression columnExpression = new ColumnExpression("IMEI", DataTypes.STRING); + columnExpression.setCarbonColumn(carbonColumn); + LiteralExpression literalExpression = new LiteralExpression("ImeiValue", DataTypes.STRING); + InExpression equalToExpression = new InExpression(columnExpression, literalExpression); + FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor(); + Method method = FilterExpressionProcessor.class + .getDeclaredMethod("getFilterResolverBasedOnExpressionType", ExpressionType.class, + boolean.class, Expression.class, AbsoluteTableIdentifier.class, Expression.class); + method.setAccessible(true); + Object result = method + .invoke(filterExpressionProcessor, ExpressionType.EQUALS, false, equalToExpression, null, + null); + assert (result.getClass().getName() + .equals("org.apache.carbondata.core.scan.filter.resolver.ConditionalFilterResolverImpl")); + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/28c94183/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 6759b20..5e6ba98 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -19,6 +19,7 @@ package org.apache.carbondata.spark.load import scala.util.Random +import com.univocity.parsers.common.TextParsingException import org.apache.spark.{Accumulator, SparkEnv, TaskContext} import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.Row @@ -234,6 +235,10 @@ object DataLoadProcessorStepOnSpark { private def wrapException(e: Throwable, model: CarbonLoadModel): Unit = { e match { case e: CarbonDataLoadingException => throw e + case e: TextParsingException => + LOGGER.error(e, "Data Loading failed for table " + model.getTableName) + throw new CarbonDataLoadingException("Data Loading failed for table " + model.getTableName, + e) case e: Exception => LOGGER.error(e, "Data Loading failed for table " + model.getTableName) throw new CarbonDataLoadingException("Data Loading failed for table " + model.getTableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/28c94183/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 29712de..0b786b5 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -27,6 +27,7 @@ import scala.collection.mutable.ListBuffer import scala.util.Random import scala.util.control.Breaks._ +import com.univocity.parsers.common.TextParsingException import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.NullWritable @@ -403,6 +404,10 @@ object CarbonDataRDDFactory { sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) { executorMessage = sparkException.getCause.getMessage errorMessage = errorMessage + ": " + executorMessage + } else if (sparkException.getCause.isInstanceOf[TextParsingException]) { + executorMessage = CarbonDataProcessorUtil + .trimErrorMessage(sparkException.getCause.getMessage) + errorMessage = errorMessage + " : " + executorMessage } case _ => if (ex.getCause != null) {
