[CARBONDATA-1937][PARTITION] Fix partition fetch fail if null partition value present in integral columns
It seems like an issue in hive while querying partitions from metastore if any integral partition column contains a null value. Now alternatively we get the full list of partitions from hive and then apply a filter to it. This closes #1730 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7f3c374b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7f3c374b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7f3c374b Branch: refs/heads/branch-1.3 Commit: 7f3c374bacdf54c9eac91a908701e3ea8dd369e0 Parents: 03ddcc8 Author: ravipesala <[email protected]> Authored: Wed Dec 27 22:32:46 2017 +0530 Committer: Jacky Li <[email protected]> Committed: Tue Jan 2 16:56:06 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 9 +++++ .../StandardPartitionTableQueryTestCase.scala | 16 ++++++++ .../spark/sql/optimizer/CarbonFilters.scala | 39 +++++++++++++++++++- .../src/main/spark2.1/CarbonSessionState.scala | 31 +++++++++++++++- .../src/main/spark2.2/CarbonSessionState.scala | 21 ++++++++++- 5 files changed, 111 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f3c374b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index 2021222..a05d023 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1501,6 +1501,15 @@ public final class CarbonCommonConstants { public static final String TIMESERIES_HIERARCHY = "timeseries.hierarchy"; + /** + * It allows queries on hive metastore directly along with filter information, otherwise first + * fetches all partitions from hive and apply filters on it. + */ + @CarbonProperty + public static final String CARBON_READ_PARTITION_HIVE_DIRECT = + "carbon.read.partition.hive.direct"; + public static final String CARBON_READ_PARTITION_HIVE_DIRECT_DEFAULT = "true"; + private CarbonCommonConstants() { } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f3c374b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala index b3c91ae..8a09093 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala @@ -208,6 +208,20 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is not null"), Seq(Row(2))) } + test("test partition fails on int null partition") { + sql("create table badrecordsPartitionintnull(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata'") + sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionintnull options('bad_records_action'='force')") + checkAnswer(sql("select count(*) cnt from badrecordsPartitionintnull where intfield2 = 13"), Seq(Row(1))) + } + + test("test partition fails on int null partition read alternate") { + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT, "false") + sql("create table badrecordsPartitionintnullalt(intField1 int, stringField1 string) partitioned by (intField2 int) stored by 'carbondata'") + sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table badrecordsPartitionintnullalt options('bad_records_action'='force')") + checkAnswer(sql("select count(*) cnt from badrecordsPartitionintnullalt where intfield2 = 13"), Seq(Row(1))) + CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT, CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT_DEFAULT) + } + test("static column partition with load command") { sql( """ @@ -249,6 +263,8 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA sql("drop table if exists staticpartitionload") sql("drop table if exists badrecordsPartitionignore") sql("drop table if exists badrecordsPartitionfail") + sql("drop table if exists badrecordsPartitionintnull") + sql("drop table if exists badrecordsPartitionintnullalt") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f3c374b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala index 24fd732..09546cd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala @@ -23,12 +23,15 @@ import org.apache.spark.sql.execution.CastExpressionOptimization import org.apache.spark.sql.types._ import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.CarbonSessionCatalog +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression} import org.apache.carbondata.core.scan.expression.conditional._ import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression} +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.util.CarbonScalaUtil @@ -405,13 +408,45 @@ object CarbonFilters { } } + /** + * Fetches partition information from hive + * @param partitionFilters + * @param sparkSession + * @param identifier + * @return + */ def getPartitions(partitionFilters: Seq[Expression], sparkSession: SparkSession, identifier: TableIdentifier): Seq[String] = { - val partitions = - sparkSession.sessionState.catalog.listPartitionsByFilter(identifier, partitionFilters) + val partitions = { + try { + if (CarbonProperties.getInstance(). + getProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT, + CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT_DEFAULT).toBoolean) { + // read partitions directly from hive metastore using filters + sparkSession.sessionState.catalog.listPartitionsByFilter(identifier, partitionFilters) + } else { + // Read partitions alternatively by firts get all partitions then filter them + sparkSession.sessionState.catalog. + asInstanceOf[CarbonSessionCatalog].getPartitionsAlternate( + partitionFilters, + sparkSession, + identifier) + } + } catch { + case e: Exception => + // Get partition information alternatively. + sparkSession.sessionState.catalog. + asInstanceOf[CarbonSessionCatalog].getPartitionsAlternate( + partitionFilters, + sparkSession, + identifier) + } + } partitions.toList.flatMap { partition => partition.spec.seq.map{case (column, value) => column + "=" + value} }.toSet.toSeq } + + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f3c374b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala index a6f28c9..dae6249 100644 --- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.hive import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} -import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery} +import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogUtils, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.parser.ParserUtils._ @@ -134,6 +134,33 @@ class CarbonSessionCatalog( def getClient(): org.apache.spark.sql.hive.client.HiveClient = { sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive } + + /** + * This is alternate way of getting partition information. It first fetches all partitions from + * hive and then apply filter instead of querying hive along with filters. + * @param partitionFilters + * @param sparkSession + * @param identifier + * @return + */ + def getPartitionsAlternate(partitionFilters: Seq[Expression], + sparkSession: SparkSession, + identifier: TableIdentifier) = { + val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier) + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier) + val partitionSchema = catalogTable.partitionSchema + if (partitionFilters.nonEmpty) { + val boundPredicate = + InterpretedPredicate.create(partitionFilters.reduce(And).transform { + case att: AttributeReference => + val index = partitionSchema.indexWhere(_.name == att.name) + BoundReference(index, partitionSchema(index).dataType, nullable = true) + }) + allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) } + } else { + allPartitions + } + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f3c374b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala index a722cbf..c8ea275 100644 --- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{Exists, In, ListQuery, ScalarSubquery} +import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery} import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.parser.ParserUtils.string @@ -141,6 +141,25 @@ class CarbonSessionCatalog( sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog .asInstanceOf[HiveExternalCatalog].client } + + /** + * This is alternate way of getting partition information. It first fetches all partitions from + * hive and then apply filter instead of querying hive along with filters. + * @param partitionFilters + * @param sparkSession + * @param identifier + * @return + */ + def getPartitionsAlternate(partitionFilters: Seq[Expression], + sparkSession: SparkSession, + identifier: TableIdentifier) = { + val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier) + ExternalCatalogUtils.prunePartitionsByFilter( + sparkSession.sessionState.catalog.getTableMetadata(identifier), + allPartitions, + partitionFilters, + sparkSession.sessionState.conf.sessionLocalTimeZone) + } }
