Fix adding partition information while doing refresh(restore) table. And fix the case sensitivity issue of partition column
Fix comments Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2dbe6c97 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2dbe6c97 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2dbe6c97 Branch: refs/heads/branch-1.3 Commit: 2dbe6c975b9aec3769750556b4b82a11930ea07d Parents: 08b8af7 Author: ravipesala <[email protected]> Authored: Thu Jan 4 00:34:33 2018 +0530 Committer: Jacky Li <[email protected]> Committed: Fri Jan 5 17:51:49 2018 +0800 ---------------------------------------------------------------------- .../StandardPartitionTableLoadingTestCase.scala | 70 ++++++++++++++++++++ .../management/RefreshCarbonTableCommand.scala | 52 ++++++++++++++- .../strategy/CarbonLateDecodeStrategy.scala | 13 +++- 3 files changed, 131 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/2dbe6c97/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index 72e464e..b399138 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -16,6 +16,9 @@ */ package org.apache.carbondata.spark.testsuite.standardpartition +import java.io.{File, IOException} + +import org.apache.commons.io.FileUtils import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.{AnalysisException, Row} import org.scalatest.BeforeAndAfterAll @@ -325,6 +328,71 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte checkAnswer(sql(s"select count(*) from emp1"), rows) } + test("test restore partition table") { + sql( + """ + | CREATE TABLE restorepartition (doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int, empname String, designation String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE restorepartition""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE restorepartition PARTITION(empno='99', empname='ravi', designation='xx')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE restorepartition PARTITION(empno='100', empname='indra', designation='yy')""") + val rows = sql("select count(*) from restorepartition").collect() + val partitions = sql("show partitions restorepartition").collect() + val table = CarbonMetadata.getInstance().getCarbonTable("default_restorepartition") + val dblocation = table.getTablePath.substring(0, table.getTablePath.lastIndexOf("/")) + backUpData(dblocation, "restorepartition") + sql("drop table restorepartition") + restoreData(dblocation, "restorepartition") + sql("refresh table restorepartition") + checkAnswer(sql("select count(*) from restorepartition"), rows) + checkAnswer(sql("show partitions restorepartition"), partitions) + } + + test("test case sensitive on partition columns") { + sql( + """ + | CREATE TABLE casesensitivepartition (doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empNo int, empName String, designation String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE casesensitivepartition""") + checkAnswer(sql("select * from casesensitivepartition where empNo=17"), + sql("select * from casesensitivepartition where empno=17")) + } + + def restoreData(dblocation: String, tableName: String) = { + val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + val source = dblocation+ "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName + try { + FileUtils.copyDirectory(new File(source), new File(destination)) + FileUtils.deleteDirectory(new File(source)) + } catch { + case e : Exception => + throw new IOException("carbon table data restore failed.") + } finally { + + } + } + def backUpData(dblocation: String, tableName: String) = { + val source = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName + val destination = dblocation+ "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName + try { + FileUtils.copyDirectory(new File(source), new File(destination)) + } catch { + case e : Exception => + throw new IOException("carbon table data backup failed.", e) + } + } + override def afterAll = { dropTable @@ -347,6 +415,8 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql("drop table if exists loadstaticpartitiononeissue") sql("drop table if exists loadpartitionwithspecialchar") sql("drop table if exists emp1") + sql("drop table if exists restorepartition") + sql("drop table if exists casesensitivepartition") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2dbe6c97/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala index 45ed298..2983ea4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala @@ -22,16 +22,20 @@ import java.util import scala.collection.JavaConverters._ import org.apache.spark.sql._ -import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand} import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.util.CarbonException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, PartitionMapFileStore} +import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.events.{OperationContext, OperationListenerBus, RefreshTablePostExecutionEvent, RefreshTablePreExecutionEvent} import org.apache.carbondata.hadoop.util.SchemaReader @@ -91,6 +95,11 @@ case class RefreshCarbonTableCommand( registerAggregates(databaseName, dataMapSchemaList)(sparkSession) } registerTableWithHive(databaseName, tableName, tableInfo)(sparkSession) + // Register partitions to hive metastore in case of hive partitioning carbon table + if (tableInfo.getFactTable.getPartitionInfo != null && + tableInfo.getFactTable.getPartitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) { + registerAllPartitionsToHive(absoluteTableIdentifier, sparkSession) + } } else { LOGGER.audit( s"Table registration with Database name [$databaseName] and Table name [$tableName] " + @@ -205,4 +214,41 @@ case class RefreshCarbonTableCommand( } }) } + + /** + * Read all the partition information which is stored in each segment and add to + * the hive metastore + */ + private def registerAllPartitionsToHive( + absIdentifier: AbsoluteTableIdentifier, + sparkSession: SparkSession): Unit = { + val metadataDetails = + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(absIdentifier.getTablePath)) + // First read all partition information from each segment. + val allpartitions = metadataDetails.map{ metadata => + if (metadata.getSegmentStatus == SegmentStatus.SUCCESS || + metadata.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) { + val mapper = new PartitionMapFileStore() + mapper.readAllPartitionsOfSegment( + CarbonTablePath.getSegmentPath(absIdentifier.getTablePath, metadata.getLoadName)) + Some(mapper.getPartitionMap.values().asScala) + } else { + None + } + }.filter(_.isDefined).map(_.get) + val identifier = + TableIdentifier(absIdentifier.getTableName, Some(absIdentifier.getDatabaseName)) + // Register the partition information to the hive metastore + allpartitions.foreach { segPartitions => + val specs: Seq[TablePartitionSpec] = segPartitions.map { indexPartitions => + indexPartitions.asScala.map{ p => + val spec = p.split("=") + (spec(0), spec(1)) + }.toMap + }.toSeq + // Add partition information + AlterTableAddPartitionCommand(identifier, specs.map((_, None)), true).run(sparkSession) + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/2dbe6c97/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala index ad519e6..6ef3d47 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala @@ -152,9 +152,20 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { .map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get)) val partitionKeyFilters = ExpressionSet(ExpressionSet(filterPredicates).filter(_.references.subsetOf(partitionSet))) + // Update the name with lower case as it is case sensitive while getting partition info. + val updatedPartitionFilters = partitionKeyFilters.map { exp => + exp.transform { + case attr: AttributeReference => + AttributeReference( + attr.name.toLowerCase, + attr.dataType, + attr.nullable, + attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated) + } + } partitions = CarbonFilters.getPartitions( - partitionKeyFilters.toSeq, + updatedPartitionFilters.toSeq, SparkSession.getActiveSession.get, relation.catalogTable.get.identifier) }
