Repository: carbondata Updated Branches: refs/heads/master bc2e897cb -> 47aafabb3
[CARBONDATA-1860][PARTITION] Support insertoverwrite for a specific partition This closes #1700 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/47aafabb Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/47aafabb Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/47aafabb Branch: refs/heads/master Commit: 47aafabb3bc2f64d828ed129746a5aea1bb5454c Parents: bc2e897 Author: ravipesala <[email protected]> Authored: Tue Dec 19 13:19:15 2017 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Thu Dec 21 09:06:49 2017 +0530 ---------------------------------------------------------------------- .../core/metadata/PartitionMapFileStore.java | 33 ++----- .../StandardPartitionTableDropTestCase.scala | 2 +- ...tandardPartitionTableOverwriteTestCase.scala | 99 ++++++++++++++++++++ .../spark/rdd/CarbonDropPartitionRDD.scala | 5 +- .../management/CarbonLoadDataCommand.scala | 99 ++++++++++++++++++-- .../CarbonStandardAlterTableDropPartition.scala | 24 +++-- 6 files changed, 215 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/47aafabb/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java index f60b69f..b44f99b 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java @@ -228,7 +228,7 @@ public class PartitionMapFileStore { } PartitionMapper mapper = new PartitionMapper(); mapper.setPartitionMap(partitionMap); - String path = segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT; + String path = segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT + ".tmp"; writePartitionFile(mapper, path); } } @@ -241,33 +241,14 @@ public class PartitionMapFileStore { * @param success */ public void commitPartitions(String segmentPath, final String uniqueId, boolean success) { - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath); + CarbonFile carbonFile = FileFactory + .getCarbonFile(segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT + ".tmp"); // write partition info to new file. if (carbonFile.exists()) { - CarbonFile[] carbonFiles = carbonFile.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile file) { - return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT); - } - }); - CarbonFile latestFile = null; - for (CarbonFile mapFile: carbonFiles) { - if (mapFile.getName().startsWith(uniqueId)) { - latestFile = mapFile; - } - } - if (latestFile != null) { - for (CarbonFile mapFile : carbonFiles) { - if (latestFile != mapFile) { - // Remove old files in case of success scenario - if (success) { - mapFile.delete(); - } - } - } - } - // If it is failure scenario then remove the new file. - if (!success && latestFile != null) { - latestFile.delete(); + if (success) { + carbonFile.renameForce(segmentPath + "/" + uniqueId + CarbonTablePath.PARTITION_MAP_EXT); + } else { + carbonFile.delete(); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/47aafabb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala index 9a9940b..2a25255 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala @@ -156,7 +156,7 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl } override def afterAll = { - dropTable +// dropTable } def dropTable = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/47aafabb/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala new file mode 100644 index 0000000..945542a --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.spark.testsuite.standardpartition + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + dropTable + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy") + sql( + """ + | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + } + + test("overwriting static partition table for date partition column on insert query") { + sql( + """ + | CREATE TABLE staticpartitiondateinsert (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp,attendance int, + | deptname String,projectcode int, + | utilization int,salary int) + | PARTITIONED BY (projectenddate Date,doj Timestamp) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert overwrite table staticpartitiondateinsert PARTITION(projectenddate='2016-06-29',doj='2010-12-29 00:00:00') select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary from originTable where projectenddate=cast('2016-06-29' as Date)""") +// sql(s"""insert overwrite table partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + checkAnswer(sql("select * from staticpartitiondateinsert where projectenddate=cast('2016-06-29' as Date)"), + sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable where projectenddate=cast('2016-06-29' as Date)")) + } + + test("overwriting partition table for date partition column on insert query") { + sql( + """ + | CREATE TABLE partitiondateinsert (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp,attendance int, + | deptname String,projectcode int, + | utilization int,salary int) + | PARTITIONED BY (projectenddate Date,doj Timestamp) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert into partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""") + sql(s"""insert overwrite table partitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable where projectenddate=cast('2016-06-29' as Date)""") + checkAnswer(sql("select * from partitiondateinsert"), + sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable where projectenddate=cast('2016-06-29' as Date)")) + } + + + override def afterAll = { + dropTable + } + + def dropTable = { + sql("drop table if exists originTable") + sql("drop table if exists partitiondateinsert") + sql("drop table if exists staticpartitiondateinsert") + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/47aafabb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala index 09d9da1..d377c4d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala @@ -94,10 +94,11 @@ class CarbonDropPartitionRDD( * @param tablePath * @param segments segments to be merged */ -class CarbonDropPartitionRollbackRDD( +class CarbonDropPartitionCommitRDD( sc: SparkContext, tablePath: String, segments: Seq[String], + success: Boolean, uniqueId: String) extends CarbonRDD[String](sc, Nil) { @@ -112,7 +113,7 @@ class CarbonDropPartitionRollbackRDD( val split = theSplit.asInstanceOf[CarbonDropPartition] logInfo("Commit partition information from : " + split.segmentPath) - new PartitionMapFileStore().commitPartitions(split.segmentPath, uniqueId, false) + new PartitionMapFileStore().commitPartitions(split.segmentPath, uniqueId, success) var havePair = false var finished = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/47aafabb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index a883735..69be362 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.command.management import java.text.SimpleDateFormat +import java.util import scala.collection.JavaConverters._ import scala.collection.mutable @@ -34,7 +35,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute} import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project} import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel} @@ -46,6 +47,7 @@ import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils} import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants} +import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer} import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider @@ -53,7 +55,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} -import org.apache.carbondata.core.statusmanager.SegmentStatus +import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus} @@ -67,7 +69,7 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer import org.apache.carbondata.spark.exception.MalformedCarbonCommandException -import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel} +import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD, DictionaryLoadModel} import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil} case class CarbonLoadDataCommand( @@ -567,14 +569,18 @@ case class CarbonLoadDataCommand( carbonLoadModel, sparkSession) } - Dataset.ofRows( - sparkSession, + val convertedPlan = CarbonReflectionUtils.getInsertIntoCommand( convertRelation, partition, query, - isOverwriteTable, - false)) + false, + false) + if (isOverwriteTable && partition.nonEmpty) { + overwritePartition(sparkSession, table, convertedPlan) + } else { + Dataset.ofRows(sparkSession, convertedPlan) + } } private def convertToLogicalRelation( @@ -597,14 +603,18 @@ case class CarbonLoadDataCommand( } val partitionSchema = StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(field => - metastoreSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get)) - + metastoreSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get)) + val overWriteLocal = if (overWrite && partition.nonEmpty) { + false + } else { + overWrite + } val dataSchema = StructType(metastoreSchema .filterNot(field => partitionSchema.contains(field.name))) val options = new mutable.HashMap[String, String]() options ++= catalogTable.storage.properties - options += (("overwrite", overWrite.toString)) + options += (("overwrite", overWriteLocal.toString)) options += (("onepass", loadModel.getUseOnePass.toString)) options += (("dicthost", loadModel.getDictionaryServerHost)) options += (("dictport", loadModel.getDictionaryServerPort.toString)) @@ -621,6 +631,75 @@ case class CarbonLoadDataCommand( Some(catalogTable)) } + /** + * Overwrite the partition data if static partitions are specified. + * @param sparkSession + * @param table + * @param logicalPlan + */ + private def overwritePartition( + sparkSession: SparkSession, + table: CarbonTable, + logicalPlan: LogicalPlan): Unit = { + sparkSession.sessionState.catalog.listPartitions( + TableIdentifier(table.getTableName, Some(table.getDatabaseName)), + Some(partition.map(f => (f._1, f._2.get)))) + val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet + val uniqueId = System.currentTimeMillis().toString + val segments = new SegmentStatusManager( + table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments + try { + // First drop the partitions from partition mapper files of each segment + new CarbonDropPartitionRDD( + sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + partitionNames.toSeq, + uniqueId).collect() + } catch { + case e: Exception => + // roll back the drop partitions from carbon store + new CarbonDropPartitionCommitRDD( + sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + false, + uniqueId).collect() + throw e + } + + try { + Dataset.ofRows(sparkSession, logicalPlan) + } catch { + case e: Exception => + // roll back the drop partitions from carbon store + new CarbonDropPartitionCommitRDD( + sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + false, + uniqueId).collect() + throw e + } + // Commit the removed partitions in carbon store. + new CarbonDropPartitionCommitRDD( + sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + true, + uniqueId).collect() + // Update the loadstatus with update time to clear cache from driver. + val segmentSet = new util.HashSet[String](new SegmentStatusManager(table + .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments) + CarbonUpdateUtil.updateTableMetadataStatus( + segmentSet, + table, + uniqueId, + true, + new util.ArrayList[String]) + DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier) + } + def getDataFrameWithTupleID(): DataFrame = { val fields = dataFrame.get.schema.fields import org.apache.spark.sql.functions.udf http://git-wip-us.apache.org/repos/asf/carbondata/blob/47aafabb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala index b787aa7..fd59587 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonStandardAlterTableDropPartition.scala @@ -32,7 +32,7 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.spark.rdd.{CarbonDropPartitionRDD, CarbonDropPartitionRollbackRDD} +import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD} /** * Drop the partitions from hive and carbon store. It drops the partitions in following steps @@ -118,20 +118,28 @@ case class CarbonStandardAlterTableDropPartition( } catch { case e: Exception => // roll back the drop partitions from carbon store - new CarbonDropPartitionRollbackRDD(sparkSession.sparkContext, + new CarbonDropPartitionCommitRDD(sparkSession.sparkContext, table.getTablePath, segments.asScala, + false, uniqueId).collect() throw e } + // commit the drop partitions from carbon store + new CarbonDropPartitionCommitRDD(sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + true, + uniqueId).collect() + // Update the loadstatus with update time to clear cache from driver. val segmentSet = new util.HashSet[String](new SegmentStatusManager(table .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments) - CarbonUpdateUtil - .updateTableMetadataStatus(segmentSet, - table, - uniqueId, - true, - new util.ArrayList[String]) + CarbonUpdateUtil.updateTableMetadataStatus( + segmentSet, + table, + uniqueId, + true, + new util.ArrayList[String]) DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier) } finally { AlterTableUtil.releaseLocks(locks)
