Repository: carbondata Updated Branches: refs/heads/master ed8564421 -> 25d949cfa
[HOTFIX] Fix partition filter slow issue #2740 Problem: In FileSourceScanExec it lists all the files of partitions from CatalogFileIndex , it causes another job creation to list files per each query. Solution: Make the CatalogFileIndex as we don't want any list files. so make the CatalogFileIndex as dummy. This closes #2740 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/25d949cf Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/25d949cf Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/25d949cf Branch: refs/heads/master Commit: 25d949cfa82c9a29fe0e54ddbe54e890cc865b7f Parents: ed85644 Author: ravipesala <[email protected]> Authored: Thu Sep 20 21:21:47 2018 +0530 Committer: kumarvishal09 <[email protected]> Committed: Mon Sep 24 12:54:19 2018 +0530 ---------------------------------------------------------------------- .../execution/datasources/CarbonFileIndex.scala | 14 ++++++++++++++ .../strategy/CarbonLateDecodeStrategy.scala | 15 ++++++++++----- 2 files changed, 24 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/25d949cf/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala ---------------------------------------------------------------------- diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala index 3a650ec..c57528f 100644 --- a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala +++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala @@ -51,6 +51,10 @@ class CarbonFileIndex( fileIndex: FileIndex) extends FileIndex with AbstractCarbonFileIndex { + // When this flag is set it just returns empty files during pruning. It is needed for carbon + // session partition flow as we handle directly through datamap pruining. + private var actAsDummy = false + override def rootPaths: Seq[Path] = fileIndex.rootPaths override def inputFiles: Array[String] = fileIndex.inputFiles @@ -70,6 +74,9 @@ class CarbonFileIndex( */ override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { + if (actAsDummy) { + return Seq.empty + } val method = fileIndex.getClass.getMethods.find(_.getName == "listFiles").get val directories = method.invoke( @@ -143,11 +150,18 @@ class CarbonFileIndex( } override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = { + if (actAsDummy) { + return Seq.empty + } val method = fileIndex.getClass.getMethods.find(_.getName == "listFiles").get val directories = method.invoke(fileIndex, filters).asInstanceOf[Seq[PartitionDirectory]] prune(filters, directories) } + + def setDummy(actDummy: Boolean): Unit = { + actAsDummy = actDummy + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/25d949cf/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index 8f128fe..f0184cd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.optimizer.{CarbonDecoderRelation, CarbonFilters} import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types._ import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} -import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil +import org.apache.spark.sql.carbondata.execution.datasources.{CarbonFileIndex, CarbonSparkDataSourceUtil} import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException @@ -704,11 +704,16 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { val sparkSession = relation.relation.sqlContext.sparkSession relation.catalogTable match { case Some(catalogTable) => - HadoopFsRelation( + val fileIndex = new CarbonFileIndex(sparkSession, + catalogTable.schema, + catalogTable.storage.properties, new CatalogFileIndex( - sparkSession, - catalogTable, - sizeInBytes = relation.relation.sizeInBytes), + sparkSession, + catalogTable, + sizeInBytes = relation.relation.sizeInBytes)) + fileIndex.setDummy(true) + HadoopFsRelation( + fileIndex, catalogTable.partitionSchema, catalogTable.schema, catalogTable.bucketSpec,
