[CARBONDATA-1976][PARTITION] Support combination of dynamic and static partitions. And fix concurrent partition load issue.
Support combination of dynamic and static partitions. This closes #1755 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/af4277e9 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/af4277e9 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/af4277e9 Branch: refs/heads/branch-1.3 Commit: af4277e9495faac3082e8c179cc049bcbeb699e2 Parents: 829e7aa Author: ravipesala <[email protected]> Authored: Wed Jan 3 10:48:09 2018 +0530 Committer: Jacky Li <[email protected]> Committed: Sun Jan 7 00:01:24 2018 +0800 ---------------------------------------------------------------------- .../core/metadata/PartitionMapFileStore.java | 40 +++++- .../hadoop/api/CarbonOutputCommitter.java | 8 +- .../StandardPartitionTableDropTestCase.scala | 28 +++++ .../StandardPartitionTableLoadingTestCase.scala | 49 +++++++- ...tandardPartitionTableOverwriteTestCase.scala | 74 +++++++++++ .../spark/rdd/CarbonDropPartitionRDD.scala | 8 +- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 6 +- .../management/CarbonLoadDataCommand.scala | 122 +++++++++++-------- ...rbonAlterTableDropHivePartitionCommand.scala | 19 ++- .../datasources/CarbonFileFormat.scala | 48 ++++++-- .../sql/parser/CarbonSpark2SqlParser.scala | 4 +- 11 files changed, 327 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java index d29dfbb..f7074c4 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java @@ -237,15 +237,21 @@ public class PartitionMapFileStore { * @param segmentPath * @param partitionsToDrop * @param uniqueId + * @param partialMatch If it is true then even the partial partition spec matches also can be + * dropped * @throws IOException */ - public void dropPartitions(String segmentPath, List<String> partitionsToDrop, String uniqueId) - throws IOException { + public void dropPartitions(String segmentPath, List<String> partitionsToDrop, String uniqueId, + boolean partialMatch) throws IOException { readAllPartitionsOfSegment(segmentPath); List<String> indexesToDrop = new ArrayList<>(); for (Map.Entry<String, List<String>> entry: partitionMap.entrySet()) { - for (String partition: partitionsToDrop) { - if (entry.getValue().contains(partition)) { + if (partialMatch) { + if (entry.getValue().containsAll(partitionsToDrop)) { + indexesToDrop.add(entry.getKey()); + } + } else { + if (partitionsToDrop.containsAll(entry.getValue())) { indexesToDrop.add(entry.getKey()); } } @@ -302,7 +308,7 @@ public class PartitionMapFileStore { LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath()); // scan through each segment. - + List<String> segmentsNeedToBeDeleted = new ArrayList<>(); for (LoadMetadataDetails segment : details) { // if this segment is valid then only we will go for deletion of related @@ -318,6 +324,12 @@ public class PartitionMapFileStore { String partitionFilePath = getPartitionFilePath(segmentPath); if (partitionFilePath != null) { PartitionMapper partitionMapper = readPartitionMap(partitionFilePath); + if (partitionMapper.partitionMap.size() == 0) { + // There is no partition information, it means all partitions are dropped. + // So segment need to be marked as delete. + segmentsNeedToBeDeleted.add(segment.getLoadName()); + continue; + } DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore(); indexFileStore.readAllIIndexOfSegment(segmentPath); @@ -356,7 +368,11 @@ public class PartitionMapFileStore { // Delete all old partition files for (CarbonFile partitionFile : partitionFiles) { if (!currentPartitionFile.getName().equalsIgnoreCase(partitionFile.getName())) { - partitionFile.delete(); + long fileTimeStamp = Long.parseLong(partitionFile.getName().substring(0, + partitionFile.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length())); + if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimeStamp) || forceDelete) { + partitionFile.delete(); + } } } } @@ -370,6 +386,18 @@ public class PartitionMapFileStore { } } } + // If any segments that are required to delete + if (segmentsNeedToBeDeleted.size() > 0) { + try { + // Mark the segments as delete. + SegmentStatusManager.updateDeletionStatus( + table.getAbsoluteTableIdentifier(), + segmentsNeedToBeDeleted, + table.getMetaDataFilepath()); + } catch (Exception e) { + throw new IOException(e); + } + } } public List<String> getPartitions(String indexFileName) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java index 525249a..bc7c56f 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java @@ -83,7 +83,13 @@ public class CarbonOutputCommitter extends FileOutputCommitter { * @throws IOException */ @Override public void commitJob(JobContext context) throws IOException { - super.commitJob(context); + try { + super.commitJob(context); + } catch (IOException e) { + // ignore, in case of concurrent load it try to remove temporary folders by other load may + // cause file not found exception. This will not impact carbon load, + LOGGER.warn(e.getMessage()); + } boolean overwriteSet = CarbonTableOutputFormat.isOverwriteSet(context.getConfiguration()); CarbonLoadModel loadModel = CarbonTableOutputFormat.getLoadModel(context.getConfiguration()); LoadMetadataDetails newMetaEntry = loadModel.getCurrentLoadMetadataDetail(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala index 9a9940b..2aa9145 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala @@ -155,6 +155,33 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl } + + test("dropping all partition on table and do compaction") { + sql( + """ + | CREATE TABLE partitionallcompaction (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (deptname String,doj Timestamp,projectcode int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""ALTER TABLE partitionallcompaction DROP PARTITION(deptname='Learning')""") + sql(s"""ALTER TABLE partitionallcompaction DROP PARTITION(deptname='configManagement')""") + sql(s"""ALTER TABLE partitionallcompaction DROP PARTITION(deptname='network')""") + sql(s"""ALTER TABLE partitionallcompaction DROP PARTITION(deptname='protocol')""") + sql(s"""ALTER TABLE partitionallcompaction DROP PARTITION(deptname='security')""") + assert(sql(s"""SHOW PARTITIONS partitionallcompaction""").collect().length == 0) + sql("ALTER TABLE partitionallcompaction COMPACT 'MAJOR'").collect() + checkAnswer( + sql(s"""select count (*) from partitionallcompaction"""), + Seq(Row(0))) + } + override def afterAll = { dropTable } @@ -167,6 +194,7 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl sql("drop table if exists partitionmany") sql("drop table if exists partitionshow") sql("drop table if exists staticpartition") + sql("drop table if exists partitionallcompaction") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index b399138..25e73c4 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -17,6 +17,8 @@ package org.apache.carbondata.spark.testsuite.standardpartition import java.io.{File, IOException} +import java.util +import java.util.concurrent.{Callable, ExecutorService, Executors} import org.apache.commons.io.FileUtils import org.apache.spark.sql.test.util.QueryTest @@ -31,7 +33,7 @@ import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfterAll { - + var executorService: ExecutorService = _ override def beforeAll { dropTable @@ -275,6 +277,47 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte } } + test("concurrent partition table load test") { + executorService = Executors.newCachedThreadPool() + sql( + """ + | CREATE TABLE partitionmultiplethreeconcurrent (empno int, doj Timestamp, + | workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (workgroupcategory int, empname String, designation String) + | STORED BY 'org.apache.carbondata.format' + | TBLPROPERTIES('DICTIONARY_INCLUDE'='empname,designation,deptname') + """.stripMargin) + + val tasks = new util.ArrayList[Callable[String]]() + tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")) + tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")) + tasks.add(new QueryTask(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethreeconcurrent OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")) + val results = executorService.invokeAll(tasks) + for (i <- 0 until tasks.size()) { + val res = results.get(i).get + assert("PASS".equals(res)) + } + executorService.shutdown() + checkAnswer(sql("select count(*) from partitionmultiplethreeconcurrent"), Seq(Row(30))) + } + + class QueryTask(query: String) extends Callable[String] { + override def call(): String = { + var result = "PASS" + try { + LOGGER.info("Executing :" + Thread.currentThread().getName) + sql(query) + } catch { + case ex: Exception => + ex.printStackTrace() + result = "FAIL" + } + result + } + } + test("merge carbon index disable data loading for partition table for three partition column") { CarbonProperties.getInstance.addProperty( CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false") @@ -396,6 +439,9 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte override def afterAll = { dropTable + if (executorService != null && !executorService.isShutdown) { + executorService.shutdownNow() + } } def dropTable = { @@ -413,6 +459,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte sql("drop table if exists streamingpartitionedtable") sql("drop table if exists mergeindexpartitionthree") sql("drop table if exists loadstaticpartitiononeissue") + sql("drop table if exists partitionmultiplethreeconcurrent") sql("drop table if exists loadpartitionwithspecialchar") sql("drop table if exists emp1") sql("drop table if exists restorepartition") http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala index 945542a..15126b6 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala @@ -16,6 +16,7 @@ */ package org.apache.carbondata.spark.testsuite.standardpartition +import org.apache.spark.sql.Row import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -85,6 +86,75 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable where projectenddate=cast('2016-06-29' as Date)")) } + test("dynamic and static partition table with load syntax") { + sql( + """ + | CREATE TABLE loadstaticpartitiondynamic (designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | PARTITIONED BY (empno int, empname String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiondynamic PARTITION(empno='1', empname) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + checkAnswer(sql(s"select count(*) from loadstaticpartitiondynamic where empno=1"), sql(s"select count(*) from loadstaticpartitiondynamic")) + } + + test("dynamic and static partition table with overwrite ") { + sql( + """ + | CREATE TABLE insertstaticpartitiondynamic (designation String, doj Timestamp,salary int) + | PARTITIONED BY (empno int, empname String) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno, empname) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + val rows = sql(s"select count(*) from insertstaticpartitiondynamic").collect() + sql("""insert overwrite table insertstaticpartitiondynamic PARTITION(empno='1', empname) select designation, doj, salary, empname from insertstaticpartitiondynamic""") + + checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno=1"), rows) + + intercept[Exception] { + sql("""insert overwrite table insertstaticpartitiondynamic PARTITION(empno, empname='ravi') select designation, doj, salary, empname from insertstaticpartitiondynamic""") + } + + } + + test("overwriting all partition on table and do compaction") { + sql( + """ + | CREATE TABLE partitionallcompaction (empno int, empname String, designation String, + | workgroupcategory int, workgroupcategoryname String, deptno int, + | projectjoindate Timestamp, projectenddate Date,attendance int, + | utilization int,salary int) + | PARTITIONED BY (deptname String,doj Timestamp,projectcode int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='Learning', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"') """) + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='configManagement', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='network', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='protocol', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE partitionallcompaction PARTITION(deptname='security', doj, projectcode) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql("ALTER TABLE partitionallcompaction COMPACT 'MAJOR'").collect() + checkExistence(sql(s"""SHOW segments for table partitionallcompaction"""), true, "Marked for Delete") + } + + test("Test overwrite static partition ") { + sql( + """ + | CREATE TABLE weather6 (type String) + | PARTITIONED BY (year int, month int, day int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + + sql("insert into weather6 partition(year=2014, month=5, day=25) select 'rainy'") + sql("insert into weather6 partition(year=2014, month=4, day=23) select 'cloudy'") + sql("insert overwrite table weather6 partition(year=2014, month=5, day=25) select 'sunny'") + checkExistence(sql("select * from weather6"), true, "sunny") + checkAnswer(sql("select count(*) from weather6"), Seq(Row(2))) + } + override def afterAll = { dropTable @@ -94,6 +164,10 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf sql("drop table if exists originTable") sql("drop table if exists partitiondateinsert") sql("drop table if exists staticpartitiondateinsert") + sql("drop table if exists loadstaticpartitiondynamic") + sql("drop table if exists insertstaticpartitiondynamic") + sql("drop table if exists partitionallcompaction") + sql("drop table if exists weather6") } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala index d377c4d..0a79295 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala @@ -37,13 +37,16 @@ case class CarbonDropPartition(rddId: Int, val idx: Int, segmentPath: String) * @param sc * @param tablePath * @param segments segments to be merged + * @param partialMatch If it is true then even the partial partition spec matches also can be + * dropped */ class CarbonDropPartitionRDD( sc: SparkContext, tablePath: String, segments: Seq[String], partitions: Seq[String], - uniqueId: String) + uniqueId: String, + partialMatch: Boolean) extends CarbonRDD[String](sc, Nil) { override def getPartitions: Array[Partition] = { @@ -60,7 +63,8 @@ class CarbonDropPartitionRDD( new PartitionMapFileStore().dropPartitions( split.segmentPath, partitions.toList.asJava, - uniqueId) + uniqueId, + partialMatch) var havePair = false var finished = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index 66351e3..3da603b 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -998,10 +998,10 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { case _ => ("", "") } - protected lazy val partitions: Parser[(String, String)] = - (ident <~ "=") ~ stringLit ^^ { + protected lazy val partitions: Parser[(String, Option[String])] = + (ident <~ "=".?) ~ stringLit.? ^^ { case opt ~ optvalue => (opt.trim, optvalue) - case _ => ("", "") + case _ => ("", None) } protected lazy val valueOptions: Parser[(Int, Int)] = http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 0c6879c..60adcd7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -602,7 +602,7 @@ case class CarbonLoadDataCommand( val lowerCasePartition = partition.map{case(key, value) => (key.toLowerCase, value)} catalogTable.schema.map { attr => attributes.find(_.name.equalsIgnoreCase(attr.name)).get - }.filter(attr => lowerCasePartition.get(attr.name.toLowerCase).isEmpty) + }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty) } else { catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get) } @@ -713,7 +713,12 @@ case class CarbonLoadDataCommand( options += (("onepass", loadModel.getUseOnePass.toString)) options += (("dicthost", loadModel.getDictionaryServerHost)) options += (("dictport", loadModel.getDictionaryServerPort.toString)) - options += (("staticpartition", partition.nonEmpty.toString)) + if (partition.nonEmpty) { + val staticPartitionStr = ObjectSerializationUtil.convertObjectToString( + new util.HashMap[String, Boolean]( + partition.map{case (col, value) => (col.toLowerCase, value.isDefined)}.asJava)) + options += (("staticpartition", staticPartitionStr)) + } options += (("operationcontext", operationContextStr)) options ++= this.options if (updateModel.isDefined) { @@ -745,63 +750,80 @@ case class CarbonLoadDataCommand( sparkSession: SparkSession, table: CarbonTable, logicalPlan: LogicalPlan): Unit = { - sparkSession.sessionState.catalog.listPartitions( - TableIdentifier(table.getTableName, Some(table.getDatabaseName)), - Some(partition.map(f => (f._1, f._2.get)))) - val partitionNames = partition.map(k => k._1 + "=" + k._2.get).toSet + val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName)) + val existingPartitions = sparkSession.sessionState.catalog.listPartitions( + identifier, + Some(partition.filter(_._2.isDefined).map(f => (f._1, f._2.get)))) + val partitionNames = existingPartitions.toList.flatMap { partition => + partition.spec.seq.map{case (column, value) => column + "=" + value} + }.toSet val uniqueId = System.currentTimeMillis().toString val segments = new SegmentStatusManager( table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments - try { - // First drop the partitions from partition mapper files of each segment - new CarbonDropPartitionRDD( - sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - partitionNames.toSeq, - uniqueId).collect() - } catch { - case e: Exception => - // roll back the drop partitions from carbon store - new CarbonDropPartitionCommitRDD( + // If any existing partitions need to be overwritten then drop from partitionmap + if (partitionNames.nonEmpty) { + try { + // First drop the partitions from partition mapper files of each segment + new CarbonDropPartitionRDD( sparkSession.sparkContext, table.getTablePath, segments.asScala, - false, - uniqueId).collect() - throw e - } + partitionNames.toSeq, + uniqueId, + partialMatch = false).collect() + } catch { + case e: Exception => + // roll back the drop partitions from carbon store + new CarbonDropPartitionCommitRDD( + sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + success = false, + uniqueId).collect() + throw e + } - try { + try { + Dataset.ofRows(sparkSession, logicalPlan) + } catch { + case e: Exception => + // roll back the drop partitions from carbon store + new CarbonDropPartitionCommitRDD( + sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + success = false, + uniqueId).collect() + throw e + } + // Commit the removed partitions in carbon store. + new CarbonDropPartitionCommitRDD( + sparkSession.sparkContext, + table.getTablePath, + segments.asScala, + success = true, + uniqueId).collect() + // get valid segments + val validsegments = + new SegmentStatusManager( + table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments + // Update the loadstatus with update time to clear cache from driver. + CarbonUpdateUtil.updateTableMetadataStatus( + new util.HashSet[String](validsegments), + table, + uniqueId, + true, + new util.ArrayList[String]) + DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier) + // Clean the overwriting segments if any. + new PartitionMapFileStore().cleanSegments( + table, + CarbonFilters.getPartitions(Seq.empty, sparkSession, identifier).asJava, + false) + } else { + // Otherwise its a normal load Dataset.ofRows(sparkSession, logicalPlan) - } catch { - case e: Exception => - // roll back the drop partitions from carbon store - new CarbonDropPartitionCommitRDD( - sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - false, - uniqueId).collect() - throw e } - // Commit the removed partitions in carbon store. - new CarbonDropPartitionCommitRDD( - sparkSession.sparkContext, - table.getTablePath, - segments.asScala, - true, - uniqueId).collect() - // Update the loadstatus with update time to clear cache from driver. - val segmentSet = new util.HashSet[String](new SegmentStatusManager(table - .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments) - CarbonUpdateUtil.updateTableMetadataStatus( - segmentSet, - table, - uniqueId, - true, - new util.ArrayList[String]) - DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier) } def getDataFrameWithTupleID(): DataFrame = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala index c68e43c..dbd686b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala @@ -25,11 +25,13 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand} +import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} +import org.apache.carbondata.core.metadata.PartitionMapFileStore import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD} @@ -73,15 +75,19 @@ case class CarbonAlterTableDropHivePartitionCommand( } // Drop the partitions from hive. - AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData) - .run(sparkSession) + AlterTableDropPartitionCommand( + tableName, + specs, + ifExists, + purge, + retainData).run(sparkSession) } Seq.empty[Row] } override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = { - AlterTableAddPartitionCommand(tableName, specs.map((_, None)), ifExists) + AlterTableAddPartitionCommand(tableName, specs.map((_, None)), true) val msg = s"Got exception $exception when processing data of drop partition." + "Adding back partitions to the metadata" LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg) @@ -114,7 +120,8 @@ case class CarbonAlterTableDropHivePartitionCommand( table.getTablePath, segments.asScala, partitionNames.toSeq, - uniqueId).collect() + uniqueId, + partialMatch = true).collect() } catch { case e: Exception => // roll back the drop partitions from carbon store @@ -143,6 +150,10 @@ case class CarbonAlterTableDropHivePartitionCommand( DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier) } finally { AlterTableUtil.releaseLocks(locks) + new PartitionMapFileStore().cleanSegments( + table, + new util.ArrayList(CarbonFilters.getPartitions(Seq.empty, sparkSession, tableName).asJava), + false) } Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index 4b368de..36df787 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -42,6 +42,7 @@ import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat} import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter +import org.apache.carbondata.hadoop.util.ObjectSerializationUtil import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util} @@ -204,9 +205,14 @@ private class CarbonOutputWriter(path: String, fieldTypes: Seq[DataType]) extends OutputWriter with AbstractCarbonOutputWriter { val partitions = getPartitionsFromPath(path, context).map(ExternalCatalogUtils.unescapePathName) - val staticPartition = { + val staticPartition: util.HashMap[String, Boolean] = { val staticPart = context.getConfiguration.get("carbon.staticpartition") - staticPart != null && staticPart.toBoolean + if (staticPart != null) { + ObjectSerializationUtil.convertStringToObject( + staticPart).asInstanceOf[util.HashMap[String, Boolean]] + } else { + null + } } lazy val partitionData = if (partitions.nonEmpty) { val updatedPartitions = partitions.map{ p => @@ -223,7 +229,7 @@ private class CarbonOutputWriter(path: String, } } - if (staticPartition) { + if (staticPartition != null) { val loadModel = recordWriter.getLoadModel val table = loadModel.getCarbonDataLoadSchema.getCarbonTable var timeStampformatString = loadModel.getTimestampformat @@ -237,11 +243,17 @@ private class CarbonOutputWriter(path: String, } val dateFormat = new SimpleDateFormat(dateFormatString) updatedPartitions.map {case (col, value) => - CarbonScalaUtil.convertToCarbonFormat(value, - CarbonScalaUtil.convertCarbonToSparkDataType( - table.getColumnByName(table.getTableName, col).getDataType), - timeFormat, - dateFormat) + // Only convert the static partitions to the carbon format and use it while loading data + // to carbon. + if (staticPartition.getOrDefault(col, false)) { + CarbonScalaUtil.convertToCarbonFormat(value, + CarbonScalaUtil.convertCarbonToSparkDataType( + table.getColumnByName(table.getTableName, col).getDataType), + timeFormat, + dateFormat) + } else { + value + } } } else { updatedPartitions.map(_._2) @@ -309,9 +321,25 @@ private class CarbonOutputWriter(path: String, (col, value) }.toMap val updatedPartitions = - if (staticPartition) { - splitPartitions + if (staticPartition != null) { + // There can be scnerio like dynamic and static combination, in that case we should convert + // only the dyanamic partition values to the proper format and store to carbon parttion map + splitPartitions.map { case (col, value) => + if (!staticPartition.getOrDefault(col, false)) { + CarbonScalaUtil.updatePartitions( + Seq((col, value)).toMap, + table, + timeFormat, + dateFormat, + serializeFormat, + badRecordAction, + isEmptyBadRecord).toSeq.head + } else { + (col, value) + } + } } else { + // All dynamic partitions need to be converted to proper format CarbonScalaUtil.updatePartitions( splitPartitions, table, http://git-wip-us.apache.org/repos/asf/carbondata/blob/af4277e9/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index a25be06..4045478 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -361,7 +361,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { validateOptions(optionsList) } val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap - val partitionSpec = partitions.getOrElse(List.empty[(String, String)]).toMap + val partitionSpec = partitions.getOrElse(List.empty[(String, Option[String])]).toMap CarbonLoadDataCommand( databaseNameOp = convertDbNameToLowerCase(databaseNameOp), tableName = tableName, @@ -374,7 +374,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { updateModel = None, tableInfoOp = None, internalOptions = Map.empty, - partition = partitionSpec.map { case (key, value) => (key, Some(value))}) + partition = partitionSpec) } protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
