[CARBONDATA-1316] Support drop partition function This closes #1317
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cb51b862 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cb51b862 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cb51b862 Branch: refs/heads/branch-1.2 Commit: cb51b86218cd815167f7c702b643ed0852c7f3dc Parents: fe36e3b Author: lionelcao <whuca...@gmail.com> Authored: Mon Sep 4 15:38:44 2017 +0800 Committer: QiangCai <qiang...@qq.com> Committed: Mon Sep 18 17:19:22 2017 +0800 ---------------------------------------------------------------------- .../core/metadata/schema/PartitionInfo.java | 5 + .../hadoop/api/CarbonTableInputFormat.java | 23 ++- .../spark/partition/DropPartitionCallable.java | 39 +++++ .../org/apache/carbondata/spark/KeyVal.scala | 4 +- .../spark/rdd/AlterTableLoadPartitionRDD.scala | 141 +++++++++++++++ .../spark/rdd/AlterTableSplitPartitionRDD.scala | 146 ---------------- .../spark/rdd/CarbonScanPartitionRDD.scala | 29 ++-- .../apache/carbondata/spark/rdd/Compactor.scala | 3 +- .../spark/rdd/DataManagementFunc.scala | 50 +++--- .../carbondata/spark/rdd/PartitionDropper.scala | 122 +++++++++++++ .../spark/rdd/PartitionSplitter.scala | 36 ++-- .../carbondata/spark/util/CommonUtil.scala | 2 +- .../spark/util/GlobalDictionaryUtil.scala | 3 +- .../command/carbonTableSchemaCommon.scala | 25 ++- .../org/apache/spark/util/PartitionUtils.scala | 15 +- .../spark/rdd/CarbonDataRDDFactory.scala | 8 +- .../execution/command/carbonTableSchema.scala | 2 +- .../spark/rdd/CarbonDataRDDFactory.scala | 103 ++++++++--- .../execution/command/carbonTableSchema.scala | 145 +++++++++++++++- .../sql/parser/CarbonSpark2SqlParser.scala | 16 +- .../partition/TestAlterPartitionTable.scala | 171 +++++++++++++++---- .../processing/merger/CarbonDataMergerUtil.java | 5 +- .../processing/spliter/RowResultProcessor.java | 105 ++++++++++++ .../spliter/RowResultSpliterProcessor.java | 105 ------------ .../exception/AlterPartitionSliceException.java | 78 +++++++++ .../exception/SliceSpliterException.java | 78 --------- 26 files changed, 978 insertions(+), 481 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java index 4b0bc3e..d0c4447 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java @@ -92,6 +92,11 @@ public class PartitionInfo implements Serializable { numPartitions = numPartitions - 1 + newPartitionNumbers; } + public void dropPartition(int index) { + partitionIds.remove(index); + numPartitions--; + } + public List<ColumnSchema> getColumnSchemaList() { return columnSchemaList; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index dcc75bd..9076233 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -306,7 +306,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { // prune partitions for filter query on partition table BitSet matchedPartitions = null; if (partitionInfo != null) { - matchedPartitions = setMatchedPartitions(null, filter, partitionInfo); + matchedPartitions = setMatchedPartitions(null, filter, partitionInfo, null); if (matchedPartitions != null) { if (matchedPartitions.cardinality() == 0) { return new ArrayList<InputSplit>(); @@ -366,9 +366,11 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { TableProvider tableProvider = new SingleTableProvider(carbonTable); // prune partitions for filter query on partition table String partitionIds = job.getConfiguration().get(ALTER_PARTITION_ID); + // matchedPartitions records partitionIndex, not partitionId BitSet matchedPartitions = null; if (partitionInfo != null) { - matchedPartitions = setMatchedPartitions(partitionIds, filter, partitionInfo); + matchedPartitions = + setMatchedPartitions(partitionIds, filter, partitionInfo, oldPartitionIdList); if (matchedPartitions != null) { if (matchedPartitions.cardinality() == 0) { return new ArrayList<InputSplit>(); @@ -396,15 +398,24 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { } } + /** + * set the matched partition indices into a BitSet + * @param partitionIds from alter table command, for normal query, it's null + * @param filter from query + * @param partitionInfo + * @param oldPartitionIdList only used in alter table command + * @return + */ private BitSet setMatchedPartitions(String partitionIds, Expression filter, - PartitionInfo partitionInfo) { + PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) { BitSet matchedPartitions = null; if (null != partitionIds) { String[] partList = partitionIds.replace("[", "").replace("]", "").split(","); - // only one partitionId in current alter table statement - matchedPartitions = new BitSet(Integer.parseInt(partList[0])); + // partList[0] -> use the first element to initiate BitSet, will auto expand later + matchedPartitions = new BitSet(Integer.parseInt(partList[0].trim())); for (String partitionId : partList) { - matchedPartitions.set(Integer.parseInt(partitionId)); + Integer index = oldPartitionIdList.indexOf(Integer.parseInt(partitionId.trim())); + matchedPartitions.set(index); } } else { if (null != filter) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/DropPartitionCallable.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/DropPartitionCallable.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/DropPartitionCallable.java new file mode 100644 index 0000000..ce66aac --- /dev/null +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/DropPartitionCallable.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.partition; + +import java.util.concurrent.Callable; + +import org.apache.carbondata.spark.rdd.PartitionDropper; + +import org.apache.spark.sql.execution.command.DropPartitionCallableModel; + +public class DropPartitionCallable implements Callable<Void> { + + private DropPartitionCallableModel dropPartitionCallableModel; + + public DropPartitionCallable(DropPartitionCallableModel dropPartitionCallableModel) { + this.dropPartitionCallableModel = dropPartitionCallableModel; + } + + @Override public Void call() { + PartitionDropper.triggerPartitionDrop(dropPartitionCallableModel); + return null; + } +} + http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala index 181f6e4..7cf8c88 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala @@ -107,11 +107,11 @@ class MergeResultImpl extends MergeResult[String, Boolean] { override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value) } -trait SplitResult[K, V] extends Serializable { +trait AlterPartitionResult[K, V] extends Serializable { def getKey(key: String, value: Boolean): (K, V) } -class SplitResultImpl extends SplitResult[String, Boolean] { +class AlterPartitionResultImpl extends AlterPartitionResult[String, Boolean] { override def getKey(key: String, value: Boolean): (String, Boolean) = (key, value) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala new file mode 100644 index 0000000..6cf8a7a --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import scala.collection.JavaConverters._ +import scala.util.Random + +import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.command.AlterPartitionModel +import org.apache.spark.util.PartitionUtils + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.processing.spliter.RowResultProcessor +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil +import org.apache.carbondata.spark.AlterPartitionResult +import org.apache.carbondata.spark.load.CarbonLoaderUtil + +class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, + result: AlterPartitionResult[K, V], + partitionIds: Seq[String], + bucketId: Int, + identifier: AbsoluteTableIdentifier, + prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) { + + var storeLocation: String = null + val carbonLoadModel = alterPartitionModel.carbonLoadModel + val segmentId = alterPartitionModel.segmentId + val oldPartitionIds = alterPartitionModel.oldPartitionIds + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val databaseName = carbonTable.getDatabaseName + val factTableName = carbonTable.getFactTableName + val partitionInfo = carbonTable.getPartitionInfo(factTableName) + + override protected def getPartitions: Array[Partition] = { + val sc = alterPartitionModel.sqlContext.sparkContext + sc.setLocalProperty("spark.scheduler.pool", "DDL") + sc.setLocalProperty("spark.job.interruptOnCancel", "true") + firstParent[Array[AnyRef]].partitions + } + + override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava + val iter = new Iterator[(K, V)] { + val partitionId = partitionInfo.getPartitionId(split.index) + carbonLoadModel.setTaskNo(String.valueOf(partitionId)) + carbonLoadModel.setSegmentId(segmentId) + carbonLoadModel.setPartitionId("0") + val tempLocationKey = CarbonDataProcessorUtil + .getTempStoreLocationKey(carbonLoadModel.getDatabaseName, + carbonLoadModel.getTableName, + segmentId, + carbonLoadModel.getTaskNo, + false, + true) + // this property is used to determine whether temp location for carbon is inside + // container temp dir or is yarn application directory. + val carbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false") + + if (carbonUseLocalDir.equalsIgnoreCase("true")) { + + val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf) + if (null != storeLocations && storeLocations.nonEmpty) { + storeLocation = storeLocations(Random.nextInt(storeLocations.length)) + } + if (storeLocation == null) { + storeLocation = System.getProperty("java.io.tmpdir") + } + } else { + storeLocation = System.getProperty("java.io.tmpdir") + } + storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index + CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation) + LOGGER.info(s"Temp storeLocation taken is $storeLocation") + + val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName, + factTableName, + carbonLoadModel.getTaskNo, + "0", + segmentId, + false, + true + ) + + val loadStatus = if (rows.isEmpty) { + LOGGER.info("After repartition this split, NO target rows to write back.") + true + } else { + val segmentProperties = PartitionUtils.getSegmentProperties(identifier, + segmentId, partitionIds.toList, oldPartitionIds, partitionInfo) + val processor = new RowResultProcessor( + carbonTable, + carbonLoadModel, + segmentProperties, + tempStoreLoc, + bucketId) + try { + processor.execute(rows) + } catch { + case e: Exception => + sys.error(s"Exception when executing Row result processor ${e.getMessage}") + } finally { + CarbonLoaderUtil + .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true) + } + } + + val loadResult = segmentId + var finished = false + + override def hasNext: Boolean = { + !finished + } + + override def next(): (K, V) = { + finished = true + result.getKey(loadResult, loadStatus) + } + } + iter + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala deleted file mode 100644 index e481fc4..0000000 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.rdd - -import scala.collection.JavaConverters._ -import scala.util.Random - -import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext} -import org.apache.spark.rdd.RDD -import org.apache.spark.util.PartitionUtils - -import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier -import org.apache.carbondata.core.metadata.schema.PartitionInfo -import org.apache.carbondata.core.mutate.CarbonUpdateUtil -import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.processing.model.CarbonLoadModel -import org.apache.carbondata.processing.spliter.RowResultSpliterProcessor -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil -import org.apache.carbondata.spark.SplitResult -import org.apache.carbondata.spark.load.CarbonLoaderUtil - -class AlterTableSplitPartitionRDD[K, V]( - sc: SparkContext, - result: SplitResult[K, V], - partitionIds: Seq[String], - segmentId: String, - bucketId: Int, - carbonLoadModel: CarbonLoadModel, - identifier: AbsoluteTableIdentifier, - storePath: String, - oldPartitionIdList: List[Int], - prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) { - - sc.setLocalProperty("spark.scheduler.pool", "DDL") - sc.setLocalProperty("spark.job.interruptOnCancel", "true") - - var storeLocation: String = null - var splitResult: String = null - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val databaseName = carbonTable.getDatabaseName - val factTableName = carbonTable.getFactTableName - val partitionInfo = carbonTable.getPartitionInfo(factTableName) - - override protected def getPartitions: Array[Partition] = firstParent[Array[AnyRef]].partitions - - override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava - val iter = new Iterator[(K, V)] { - val partitionId = partitionInfo.getPartitionId(split.index) - carbonLoadModel.setTaskNo(String.valueOf(partitionId)) - carbonLoadModel.setSegmentId(segmentId) - carbonLoadModel.setPartitionId("0") - val tempLocationKey = CarbonDataProcessorUtil - .getTempStoreLocationKey(carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, - segmentId, - carbonLoadModel.getTaskNo, - false, - true) - // this property is used to determine whether temp location for carbon is inside - // container temp dir or is yarn application directory. - val carbonUseLocalDir = CarbonProperties.getInstance() - .getProperty("carbon.use.local.dir", "false") - - if (carbonUseLocalDir.equalsIgnoreCase("true")) { - - val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf) - if (null != storeLocations && storeLocations.nonEmpty) { - storeLocation = storeLocations(Random.nextInt(storeLocations.length)) - } - if (storeLocation == null) { - storeLocation = System.getProperty("java.io.tmpdir") - } - } else { - storeLocation = System.getProperty("java.io.tmpdir") - } - storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index - CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation) - LOGGER.info(s"Temp storeLocation taken is $storeLocation") - - val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName, - factTableName, - carbonLoadModel.getTaskNo, - "0", - segmentId, - false, - true - ) - - val splitStatus = if (rows.isEmpty) { - LOGGER.info("After repartition this split, NO target rows to write back.") - true - } else { - try { - val segmentProperties = PartitionUtils.getSegmentProperties(identifier, - segmentId, partitionIds.toList, oldPartitionIdList, partitionInfo) - val processor = new RowResultSpliterProcessor( - carbonTable, - carbonLoadModel, - segmentProperties, - tempStoreLoc, - bucketId - ) - processor.execute(rows) - } catch { - case e: Exception => - sys.error(s"Exception when executing Row result processor ${e.getMessage}") - } finally { - CarbonLoaderUtil - .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true) - } - - } - - val splitResult = segmentId - var finished = false - - override def hasNext: Boolean = { - !finished - } - - override def next(): (K, V) = { - finished = true - result.getKey(splitResult, splitStatus) - } - } - iter - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala index 2a39db5..86bc79f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.command.AlterPartitionModel import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.PartitionUtils @@ -53,27 +54,23 @@ import org.apache.carbondata.spark.load.CarbonLoaderUtil /** * This RDD is used in alter table partition statement to get data of target partitions, * then repartition data according to new partitionInfo - * @param sc + * @param alterPartitionModel + * @param carbonTableIdentifier * @param partitionIds the ids of target partition to be scanned - * @param storePath - * @param segmentId * @param bucketId - * @param oldPartitionIdList the taskId in partition order before partitionInfo is modified - * @param carbonTableIdentifier - * @param carbonLoadModel */ -class CarbonScanPartitionRDD( - sc: SparkContext, - partitionIds: Seq[String], - storePath: String, - segmentId: String, - bucketId: Int, - oldPartitionIdList: List[Int], +class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel, carbonTableIdentifier: CarbonTableIdentifier, - carbonLoadModel: CarbonLoadModel) - extends RDD[(AnyRef, Array[AnyRef])](sc, Nil) { + partitionIds: Seq[String], + bucketId: Int) + extends RDD[(AnyRef, Array[AnyRef])](alterPartitionModel.sqlContext.sparkContext, Nil) { - private val queryId = sc.getConf.get("queryId", System.nanoTime() + "") + private val queryId = alterPartitionModel.sqlContext.sparkContext.getConf + .get("queryId", System.nanoTime() + "") + val segmentId = alterPartitionModel.segmentId + val carbonLoadModel = alterPartitionModel.carbonLoadModel + val oldPartitionIdList = alterPartitionModel.oldPartitionIds + val storePath = carbonLoadModel.getStorePath val identifier = new AbsoluteTableIdentifier(storePath, carbonTableIdentifier) var storeLocation: String = null var splitStatus: Boolean = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala index c13a942..fb610c1 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala @@ -36,14 +36,13 @@ object Compactor { def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = { - val storePath = compactionCallableModel.storePath val storeLocation = compactionCallableModel.storeLocation val carbonTable = compactionCallableModel.carbonTable val loadsToMerge = compactionCallableModel.loadsToMerge val sc = compactionCallableModel.sqlContext val carbonLoadModel = compactionCallableModel.carbonLoadModel val compactionType = compactionCallableModel.compactionType - + val storePath = carbonLoadModel.getStorePath val startTime = System.nanoTime() val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge) var finalMergeStatus = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala index bca119e..c2b7b74 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel, SplitPartitionCallableModel} +import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel, DropPartitionCallableModel, SplitPartitionCallableModel} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -39,7 +39,7 @@ import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadM import org.apache.carbondata.spark._ import org.apache.carbondata.spark.compaction.CompactionCallable import org.apache.carbondata.spark.load._ -import org.apache.carbondata.spark.partition.SplitPartitionCallable +import org.apache.carbondata.spark.partition.{DropPartitionCallable, SplitPartitionCallable} import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil} /** @@ -149,7 +149,6 @@ object DataManagementFunc { } def executeCompaction(carbonLoadModel: CarbonLoadModel, - storePath: String, compactionModel: CompactionModel, executor: ExecutorService, sqlContext: SQLContext, @@ -161,7 +160,6 @@ object DataManagementFunc { var segList = carbonLoadModel.getLoadMetadataDetails var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged( - storePath, carbonLoadModel, compactionModel.compactionSize, segList, @@ -180,7 +178,6 @@ object DataManagementFunc { scanSegmentsAndSubmitJob(futureList, loadsToMerge, executor, - storePath, sqlContext, compactionModel, carbonLoadModel, @@ -200,7 +197,7 @@ object DataManagementFunc { } // scan again and determine if anything is there to merge again. - CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath) + CommonUtil.readLoadMetadataDetails(carbonLoadModel) segList = carbonLoadModel.getLoadMetadataDetails // in case of major compaction we will scan only once and come out as it will keep // on doing major for the new loads also. @@ -215,7 +212,6 @@ object DataManagementFunc { loadsToMerge.clear() } else if (segList.size > 0) { loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged( - storePath, carbonLoadModel, compactionModel.compactionSize, segList, @@ -234,10 +230,8 @@ object DataManagementFunc { * @param futureList */ private def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]], - loadsToMerge: util - .List[LoadMetadataDetails], + loadsToMerge: util.List[LoadMetadataDetails], executor: ExecutorService, - storePath: String, sqlContext: SQLContext, compactionModel: CompactionModel, carbonLoadModel: CarbonLoadModel, @@ -248,8 +242,7 @@ object DataManagementFunc { } ) - val compactionCallableModel = CompactionCallableModel(storePath, - carbonLoadModel, + val compactionCallableModel = CompactionCallableModel(carbonLoadModel, storeLocation, compactionModel.carbonTable, loadsToMerge, @@ -264,14 +257,13 @@ object DataManagementFunc { def executePartitionSplit( sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, executor: ExecutorService, - storePath: String, segment: String, partitionId: String, oldPartitionIdList: List[Int]): Unit = { val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]]( CarbonCommonConstants.DEFAULT_COLLECTION_SIZE ) - scanSegmentsForSplitPartition(futureList, executor, storePath, segment, partitionId, + scanSegmentsForSplitPartition(futureList, executor, segment, partitionId, sqlContext, carbonLoadModel, oldPartitionIdList) try { futureList.asScala.foreach(future => { @@ -287,15 +279,13 @@ object DataManagementFunc { private def scanSegmentsForSplitPartition(futureList: util.List[Future[Void]], executor: ExecutorService, - storePath: String, segmentId: String, partitionId: String, sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, oldPartitionIdList: List[Int]): Unit = { - val splitModel = SplitPartitionCallableModel(storePath, - carbonLoadModel, + val splitModel = SplitPartitionCallableModel(carbonLoadModel, segmentId, partitionId, oldPartitionIdList, @@ -305,9 +295,27 @@ object DataManagementFunc { futureList.add(future) } - def prepareCarbonLoadModel(storePath: String, - table: CarbonTable, - newCarbonLoadModel: CarbonLoadModel): Unit = { + def executeDroppingPartition(sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + executor: ExecutorService, + segmentId: String, + partitionId: String, + dropWithData: Boolean, + oldPartitionIds: List[Int]): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val model = new DropPartitionCallableModel(carbonLoadModel, + segmentId, partitionId, oldPartitionIds, dropWithData, carbonTable, sqlContext) + val future: Future[Void] = executor.submit(new DropPartitionCallable(model)) + try { + future.get + } catch { + case e: Exception => + LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }") + throw e + } + } + + def prepareCarbonLoadModel(table: CarbonTable, newCarbonLoadModel: CarbonLoadModel): Unit = { newCarbonLoadModel.setTableName(table.getFactTableName) val dataLoadSchema = new CarbonDataLoadSchema(table) // Need to fill dimension relation @@ -315,7 +323,7 @@ object DataManagementFunc { newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName) newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName) newCarbonLoadModel.setStorePath(table.getStorePath) - CommonUtil.readLoadMetadataDetails(newCarbonLoadModel, storePath) + CommonUtil.readLoadMetadataDetails(newCarbonLoadModel) val loadStartTime = CarbonUpdateUtil.readCurrentTime(); newCarbonLoadModel.setFactTimeStamp(loadStartTime) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala new file mode 100644 index 0000000..0a41f44 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.rdd + +import java.io.IOException + +import org.apache.spark.sql.execution.command.{AlterPartitionModel, DropPartitionCallableModel} +import org.apache.spark.util.PartitionUtils + +import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.metadata.schema.partition.PartitionType +import org.apache.carbondata.spark.{AlterPartitionResultImpl, PartitionFactory} + +object PartitionDropper { + + val logger = LogServiceFactory.getLogService(PartitionDropper.getClass.getName) + + def triggerPartitionDrop(dropPartitionCallableModel: DropPartitionCallableModel): Unit = { + val alterPartitionModel = new AlterPartitionModel(dropPartitionCallableModel.carbonLoadModel, + dropPartitionCallableModel.segmentId, + dropPartitionCallableModel.oldPartitionIds, + dropPartitionCallableModel.sqlContext + ) + val partitionId = dropPartitionCallableModel.partitionId + val oldPartitionIds = dropPartitionCallableModel.oldPartitionIds + val dropWithData = dropPartitionCallableModel.dropWithData + val carbonTable = dropPartitionCallableModel.carbonTable + val dbName = carbonTable.getDatabaseName + val tableName = carbonTable.getFactTableName + val identifier = carbonTable.getAbsoluteTableIdentifier + val carbonTableIdentifier = identifier.getCarbonTableIdentifier + val partitionInfo = carbonTable.getPartitionInfo(tableName) + val partitioner = PartitionFactory.getPartitioner(partitionInfo) + + var finalDropStatus = false + val bucketInfo = carbonTable.getBucketingInfo(tableName) + val bucketNumber = bucketInfo match { + case null => 1 + case _ => bucketInfo.getNumberOfBuckets + } + val partitionIndex = oldPartitionIds.indexOf(Integer.valueOf(partitionId)) + val targetPartitionId = partitionInfo.getPartitionType match { + case PartitionType.RANGE => if (partitionIndex == oldPartitionIds.length - 1) { + "0" + } else { + String.valueOf(oldPartitionIds(partitionIndex + 1)) + } + case PartitionType.LIST => "0" + } + + if (!dropWithData) { + try { + for (i <- 0 until bucketNumber) { + val bucketId = i + val rdd = new CarbonScanPartitionRDD(alterPartitionModel, + carbonTableIdentifier, + Seq(partitionId, targetPartitionId), + bucketId + ).partitionBy(partitioner).map(_._2) + + val dropStatus = new AlterTableLoadPartitionRDD(alterPartitionModel, + new AlterPartitionResultImpl(), + Seq(partitionId), + bucketId, + identifier, + rdd).collect() + + if (dropStatus.length == 0) { + finalDropStatus = false + } else { + finalDropStatus = dropStatus.forall(_._2) + } + if (!finalDropStatus) { + logger.audit(s"Drop Partition request failed for table " + + s"${ dbName }.${ tableName }") + logger.error(s"Drop Partition request failed for table " + + s"${ dbName }.${ tableName }") + } + } + + if (finalDropStatus) { + try { + PartitionUtils.deleteOriginalCarbonFile(alterPartitionModel, identifier, + Seq(partitionId, targetPartitionId).toList, dbName, + tableName, partitionInfo) + } catch { + case e: IOException => sys.error(s"Exception while delete original carbon files " + + e.getMessage) + } + logger.audit(s"Drop Partition request completed for table " + + s"${ dbName }.${ tableName }") + logger.info(s"Drop Partition request completed for table " + + s"${ dbName }.${ tableName }") + } + } catch { + case e: Exception => sys.error(s"Exception in dropping partition action: ${ e.getMessage }") + } + } else { + PartitionUtils.deleteOriginalCarbonFile(alterPartitionModel, identifier, + Seq(partitionId).toList, dbName, tableName, partitionInfo) + logger.audit(s"Drop Partition request completed for table " + + s"${ dbName }.${ tableName }") + logger.info(s"Drop Partition request completed for table " + + s"${ dbName }.${ tableName }") + } + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala index 48e1bee..fca7542 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala @@ -19,22 +19,24 @@ package org.apache.carbondata.spark.rdd import java.io.IOException -import org.apache.spark.sql.execution.command.SplitPartitionCallableModel +import org.apache.spark.sql.execution.command.{AlterPartitionModel, SplitPartitionCallableModel} import org.apache.spark.util.PartitionUtils import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.spark.{PartitionFactory, SplitResultImpl} +import org.apache.carbondata.spark.{AlterPartitionResultImpl, PartitionFactory} object PartitionSplitter { val logger = LogServiceFactory.getLogService(PartitionSplitter.getClass.getName) def triggerPartitionSplit(splitPartitionCallableModel: SplitPartitionCallableModel): Unit = { - val sc = splitPartitionCallableModel.sqlContext.sparkContext + + val alterPartitionModel = new AlterPartitionModel(splitPartitionCallableModel.carbonLoadModel, + splitPartitionCallableModel.segmentId, + splitPartitionCallableModel.oldPartitionIds, + splitPartitionCallableModel.sqlContext + ) val partitionId = splitPartitionCallableModel.partitionId - val storePath = splitPartitionCallableModel.storePath - val segmentId = splitPartitionCallableModel.segmentId - val oldPartitionIdList = splitPartitionCallableModel.oldPartitionIdList val carbonLoadModel = splitPartitionCallableModel.carbonLoadModel val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val identifier = carbonTable.getAbsoluteTableIdentifier @@ -53,25 +55,17 @@ object PartitionSplitter { for (i <- 0 until bucketNumber) { val bucketId = i val rdd = new CarbonScanPartitionRDD( - sc, - Seq(partitionId), - storePath, - segmentId, - bucketId, - oldPartitionIdList, + alterPartitionModel, carbonTableIdentifier, - carbonLoadModel + Seq(partitionId), + bucketId ).partitionBy(partitioner).map(_._2) - val splitStatus = new AlterTableSplitPartitionRDD(sc, - new SplitResultImpl(), + val splitStatus = new AlterTableLoadPartitionRDD(alterPartitionModel, + new AlterPartitionResultImpl(), Seq(partitionId), - segmentId, bucketId, - carbonLoadModel, identifier, - storePath, - oldPartitionIdList, rdd).collect() if (splitStatus.length == 0) { @@ -89,8 +83,8 @@ object PartitionSplitter { if (finalSplitStatus) { try { PartitionUtils. - deleteOriginalCarbonFile(identifier, segmentId, Seq(partitionId).toList, - oldPartitionIdList, storePath, databaseName, tableName, partitionInfo, carbonLoadModel) + deleteOriginalCarbonFile(alterPartitionModel, identifier, Seq(partitionId).toList + , databaseName, tableName, partitionInfo) } catch { case e: IOException => sys.error(s"Exception while delete original carbon files " + e.getMessage) http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index fd265a8..f123624 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -537,7 +537,7 @@ object CommonUtil { } } - def readLoadMetadataDetails(model: CarbonLoadModel, storePath: String): Unit = { + def readLoadMetadataDetails(model: CarbonLoadModel): Unit = { val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath val details = SegmentStatusManager.readLoadMetadata(metadataPath) model.setLoadMetadataDetails(details.toList.asJava) http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala index 47eaece..601c0c7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala @@ -314,7 +314,6 @@ object GlobalDictionaryUtil { isComplexes += dimensions(i).isComplex } } - val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, table) val primDimensions = primDimensionsBuffer.map { x => x }.toArray val dictDetail = CarbonSparkFactory.getDictionaryDetailService. getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation) @@ -330,7 +329,7 @@ object GlobalDictionaryUtil { carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1) // get load count if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel, hdfsLocation) + CommonUtil.readLoadMetadataDetails(carbonLoadModel) } DictionaryLoadModel(table, dimensions, http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index cc2cc82..f5d69ef 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -118,19 +118,31 @@ case class CompactionModel(compactionSize: Long, carbonTable: CarbonTable, isDDLTrigger: Boolean) -case class CompactionCallableModel(storePath: String, - carbonLoadModel: CarbonLoadModel, +case class CompactionCallableModel(carbonLoadModel: CarbonLoadModel, storeLocation: String, carbonTable: CarbonTable, loadsToMerge: util.List[LoadMetadataDetails], sqlContext: SQLContext, compactionType: CompactionType) -case class SplitPartitionCallableModel(storePath: String, - carbonLoadModel: CarbonLoadModel, +case class AlterPartitionModel(carbonLoadModel: CarbonLoadModel, + segmentId: String, + oldPartitionIds: List[Int], + sqlContext: SQLContext +) + +case class SplitPartitionCallableModel(carbonLoadModel: CarbonLoadModel, segmentId: String, partitionId: String, - oldPartitionIdList: List[Int], + oldPartitionIds: List[Int], + sqlContext: SQLContext) + +case class DropPartitionCallableModel(carbonLoadModel: CarbonLoadModel, + segmentId: String, + partitionId: String, + oldPartitionIds: List[Int], + dropWithData: Boolean, + carbonTable: CarbonTable, sqlContext: SQLContext) case class DataTypeInfo(dataType: String, precision: Int = 0, scale: Int = 0) @@ -160,7 +172,8 @@ case class AlterTableDropColumnModel(databaseName: Option[String], case class AlterTableDropPartitionModel(databaseName: Option[String], tableName: String, - partitionId: String) + partitionId: String, + dropWithData: Boolean) case class AlterTableSplitPartitionModel(databaseName: Option[String], tableName: String, http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala index 3982f7b..002ed27 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -26,6 +26,7 @@ import scala.collection.mutable.ListBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job +import org.apache.spark.sql.execution.command.AlterPartitionModel import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo} import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier @@ -151,13 +152,17 @@ object PartitionUtils { } @throws(classOf[IOException]) - def deleteOriginalCarbonFile(identifier: AbsoluteTableIdentifier, segmentId: String, - partitionIds: List[String], oldPartitionIdList: List[Int], storePath: String, - dbName: String, tableName: String, partitionInfo: PartitionInfo, - carbonLoadModel: CarbonLoadModel): Unit = { + def deleteOriginalCarbonFile(alterPartitionModel: AlterPartitionModel, + identifier: AbsoluteTableIdentifier, + partitionIds: List[String], dbName: String, tableName: String, + partitionInfo: PartitionInfo): Unit = { + val carbonLoadModel = alterPartitionModel.carbonLoadModel + val segmentId = alterPartitionModel.segmentId + val oldPartitionIds = alterPartitionModel.oldPartitionIds val newTime = carbonLoadModel.getFactTimeStamp + val storePath = carbonLoadModel.getStorePath val tableBlockInfoList = - getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIdList, + getPartitionBlockList(identifier, segmentId, partitionIds, oldPartitionIds, partitionInfo).asScala val pathList: util.List[String] = new util.ArrayList[String]() val carbonTablePath = new CarbonTablePath(storePath, dbName, tableName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index ef2a917..596cebf 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -104,7 +104,7 @@ object CarbonDataRDDFactory { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath) + CommonUtil.readLoadMetadataDetails(carbonLoadModel) } // reading the start time of data load. val loadStartTime = CarbonUpdateUtil.readCurrentTime(); @@ -228,7 +228,7 @@ object CarbonDataRDDFactory { compactionLock: ICarbonLock): Unit = { val executor: ExecutorService = Executors.newFixedThreadPool(1) // update the updated table status. - CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath) + CommonUtil.readLoadMetadataDetails(carbonLoadModel) val compactionThread = new Thread { override def run(): Unit = { @@ -238,7 +238,6 @@ object CarbonDataRDDFactory { var exception: Exception = null try { DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel, - storePath: String, compactionModel: CompactionModel, executor, sqlContext, storeLocation ) @@ -269,7 +268,7 @@ object CarbonDataRDDFactory { val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath) val newCarbonLoadModel = new CarbonLoadModel() - DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel) + DataManagementFunc.prepareCarbonLoadModel(table, newCarbonLoadModel) val compactionSize = CarbonDataMergerUtil .getCompactionSize(CompactionType.MAJOR_COMPACTION) @@ -282,7 +281,6 @@ object CarbonDataRDDFactory { // proceed for compaction try { DataManagementFunc.executeCompaction(newCarbonLoadModel, - newCarbonLoadModel.getStorePath, newcompactionModel, executor, sqlContext, storeLocation ) http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 8a39b0a..130f305 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -593,7 +593,7 @@ case class LoadTable( LOGGER.info(s"Overwrite is in progress for carbon table with $dbName.$tableName") } if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath) + CommonUtil.readLoadMetadataDetails(carbonLoadModel) } if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass && StringUtils.isEmpty(columnDict) && StringUtils.isEmpty(allDictionaryPath)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index c7b72d5..0edfccf 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -76,7 +76,6 @@ object CarbonDataRDDFactory { def alterTableForCompaction(sqlContext: SQLContext, alterTableModel: AlterTableModel, carbonLoadModel: CarbonLoadModel, - storePath: String, storeLocation: String): Unit = { var compactionSize: Long = 0 var compactionType: CompactionType = CompactionType.MINOR_COMPACTION @@ -104,7 +103,7 @@ object CarbonDataRDDFactory { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath) + CommonUtil.readLoadMetadataDetails(carbonLoadModel) } // reading the start time of data load. val loadStartTime : Long = @@ -135,7 +134,6 @@ object CarbonDataRDDFactory { LOGGER.info("System level compaction lock is enabled.") handleCompactionForSystemLocking(sqlContext, carbonLoadModel, - storePath, storeLocation, compactionType, carbonTable, @@ -154,7 +152,6 @@ object CarbonDataRDDFactory { try { startCompactionThreads(sqlContext, carbonLoadModel, - storePath, storeLocation, compactionModel, lock @@ -178,14 +175,12 @@ object CarbonDataRDDFactory { def alterTableSplitPartition(sqlContext: SQLContext, partitionId: String, carbonLoadModel: CarbonLoadModel, - storePath: String, oldPartitionIdList: List[Int]): Unit = { LOGGER.audit(s"Add partition request received for table " + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") try { startSplitThreads(sqlContext, carbonLoadModel, - storePath, partitionId, oldPartitionIdList) } catch { @@ -195,9 +190,28 @@ object CarbonDataRDDFactory { } } + def alterTableDropPartition(sqlContext: SQLContext, + partitionId: String, + carbonLoadModel: CarbonLoadModel, + dropWithData: Boolean, + oldPartitionIds: List[Int]): Unit = { + LOGGER.audit(s"Drop partition request received for table " + + s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") + try { + startDropThreads(sqlContext, + carbonLoadModel, + partitionId, + dropWithData, + oldPartitionIds) + } catch { + case e: Exception => + LOGGER.error(s"Exception in start dropping partition thread. ${ e.getMessage }") + throw e + } + } + def handleCompactionForSystemLocking(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, - storePath: String, storeLocation: String, compactionType: CompactionType, carbonTable: CarbonTable, @@ -212,7 +226,6 @@ object CarbonDataRDDFactory { try { startCompactionThreads(sqlContext, carbonLoadModel, - storePath, storeLocation, compactionModel, lock @@ -248,7 +261,6 @@ object CarbonDataRDDFactory { def startCompactionThreads(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, - storePath: String, storeLocation: String, compactionModel: CompactionModel, compactionLock: ICarbonLock): Unit = { @@ -257,7 +269,7 @@ object CarbonDataRDDFactory { if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA_COMPACTION) { // update the updated table status. For the case of Update Delta Compaction the Metadata // is filled in LoadModel, no need to refresh. - CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath) + CommonUtil.readLoadMetadataDetails(carbonLoadModel) } val compactionThread = new Thread { @@ -269,7 +281,6 @@ object CarbonDataRDDFactory { var exception: Exception = null try { DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel, - storePath: String, compactionModel: CompactionModel, executor, sqlContext, storeLocation ) @@ -301,7 +312,7 @@ object CarbonDataRDDFactory { val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath) val newCarbonLoadModel = new CarbonLoadModel() - DataManagementFunc.prepareCarbonLoadModel(storePath, table, newCarbonLoadModel) + DataManagementFunc.prepareCarbonLoadModel(table, newCarbonLoadModel) val compactionSize = CarbonDataMergerUtil .getCompactionSize(CompactionType.MAJOR_COMPACTION) @@ -314,7 +325,6 @@ object CarbonDataRDDFactory { // proceed for compaction try { DataManagementFunc.executeCompaction(newCarbonLoadModel, - newCarbonLoadModel.getStorePath, newcompactionModel, executor, sqlContext, storeLocation ) @@ -365,7 +375,6 @@ object CarbonDataRDDFactory { case class SplitThread(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, executor: ExecutorService, - storePath: String, segmentId: String, partitionId: String, oldPartitionIdList: List[Int]) extends Thread { @@ -374,8 +383,7 @@ object CarbonDataRDDFactory { var exception: Exception = null try { DataManagementFunc.executePartitionSplit(sqlContext, - carbonLoadModel, executor, storePath, segmentId, partitionId, - oldPartitionIdList) + carbonLoadModel, executor, segmentId, partitionId, oldPartitionIdList) triggeredSplitPartitionStatus = true } catch { case e: Exception => @@ -388,9 +396,26 @@ object CarbonDataRDDFactory { } } + case class dropPartitionThread(sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + executor: ExecutorService, + segmentId: String, + partitionId: String, + dropWithData: Boolean, + oldPartitionIds: List[Int]) extends Thread { + override def run(): Unit = { + try { + DataManagementFunc.executeDroppingPartition(sqlContext, carbonLoadModel, executor, + segmentId, partitionId, dropWithData, oldPartitionIds) + } catch { + case e: Exception => + LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }") + } + } + } + def startSplitThreads(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, - storePath: String, partitionId: String, oldPartitionIdList: List[Int]): Unit = { val numberOfCores = CarbonProperties.getInstance() @@ -405,7 +430,7 @@ object CarbonDataRDDFactory { val threadArray: Array[SplitThread] = new Array[SplitThread](validSegments.size) var i = 0 validSegments.foreach { segmentId => - threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor, storePath, + threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor, segmentId, partitionId, oldPartitionIdList) threadArray(i).start() i += 1 @@ -429,6 +454,46 @@ object CarbonDataRDDFactory { } } + def startDropThreads(sqlContext: SQLContext, + carbonLoadModel: CarbonLoadModel, + partitionId: String, + dropWithData: Boolean, + oldPartitionIds: List[Int]): Unit = { + val numberOfCores = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION, + CarbonCommonConstants.DEFAULT_NUMBER_CORES) + val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt) + try { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier + val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier) + val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala + val threadArray: Array[Thread] = new Array[Thread](validSegments.size) + var i = 0 + for (segmentId: String <- validSegments) { + threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, executor, + segmentId, partitionId, dropWithData, oldPartitionIds) + threadArray(i).start() + i += 1 + } + for (thread <- threadArray) { + thread.join() + } + } catch { + case e: Exception => + LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }") + } finally { + executor.shutdown() + try { + CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false) + } catch { + case e: Exception => + LOGGER.error(s"Exception in dropping partition thread while deleting partial load file" + + s" ${ e.getMessage }") + } + } + } + def loadCarbonData(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, storePath: String, @@ -473,7 +538,6 @@ object CarbonDataRDDFactory { handleCompactionForSystemLocking(sqlContext, carbonLoadModel, - storePath, storeLocation, CompactionType.MINOR_COMPACTION, carbonTable, @@ -490,7 +554,6 @@ object CarbonDataRDDFactory { try { startCompactionThreads(sqlContext, carbonLoadModel, - storePath, storeLocation, compactionModel, lock http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 3f0153e..7ed280e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -21,7 +21,7 @@ import java.text.SimpleDateFormat import java.util import scala.collection.JavaConverters._ -import scala.collection.mutable.ListBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.language.implicitConversions import org.apache.commons.lang3.StringUtils @@ -52,6 +52,7 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo import org.apache.carbondata.core.metadata.schema.partition.PartitionType import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension +import org.apache.carbondata.core.metadata.schema.table.TableInfo import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} import org.apache.carbondata.core.util.path.CarbonStorePath @@ -177,7 +178,6 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab .alterTableForCompaction(sparkSession.sqlContext, alterTableModel, carbonLoadModel, - relation.tableMeta.storePath, storeLocation ) } catch { @@ -301,7 +301,6 @@ case class AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitP CarbonDataRDDFactory.alterTableSplitPartition(sparkSession.sqlContext, partitionId.toString, carbonLoadModel, - relation.tableMeta.storePath, oldPartitionIds.asScala.toList ) success = true @@ -313,6 +312,7 @@ case class AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitP AlterTableUtil.releaseLocks(locks) CacheProvider.getInstance().dropAllCache() LOGGER.info("Locks released after alter table add/split partition action.") + LOGGER.audit("Locks released after alter table add/split partition action.") if (success) { LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName") LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName") @@ -322,7 +322,142 @@ case class AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitP } } -case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand +case class AlterTableDropPartition(alterTableDropPartitionModel: AlterTableDropPartitionModel) + extends RunnableCommand with DataProcessCommand with SchemaProcessCommand { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val tableName = alterTableDropPartitionModel.tableName + var dbName: String = null + val partitionId = alterTableDropPartitionModel.partitionId + val dropWithData = alterTableDropPartitionModel.dropWithData + if (partitionId == 0 ) { + sys.error(s"Cannot drop default partition! Please use delete statement!") + } + var partitionInfo: PartitionInfo = null + var carbonMetaStore: CarbonMetaStore = null + var relation: CarbonRelation = null + var storePath: String = null + var table: CarbonTable = null + var carbonTableIdentifier: CarbonTableIdentifier = null + val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]() + val locksToBeAcquired = List(LockUsage.METADATA_LOCK, + LockUsage.COMPACTION_LOCK, + LockUsage.DELETE_SEGMENT_LOCK, + LockUsage.DROP_TABLE_LOCK, + LockUsage.CLEAN_FILES_LOCK, + LockUsage.ALTER_PARTITION_LOCK) + + def run(sparkSession: SparkSession): Seq[Row] = { + processSchema(sparkSession) + processData(sparkSession) + Seq.empty + } + + override def processSchema(sparkSession: SparkSession): Seq[Row] = { + dbName = alterTableDropPartitionModel.databaseName + .getOrElse(sparkSession.catalog.currentDatabase) + carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore + relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession) + .asInstanceOf[CarbonRelation] + carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier + storePath = relation.tableMeta.storePath + carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath) + if (relation == null) { + sys.error(s"Table $dbName.$tableName does not exist") + } + if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)) { + LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName") + sys.error(s"Alter table failed. table not found: $dbName.$tableName") + } + table = relation.tableMeta.carbonTable + partitionInfo = table.getPartitionInfo(tableName) + if (partitionInfo == null) { + sys.error(s"Table $tableName is not a partition table.") + } + val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList + // keep a copy of partitionIdList before update partitionInfo. + // will be used in partition data scan + oldPartitionIds.addAll(partitionIds.asJava) + val partitionIndex = partitionIds.indexOf(Integer.valueOf(partitionId)) + partitionInfo.getPartitionType match { + case PartitionType.HASH => sys.error(s"Hash partition cannot be dropped!") + case PartitionType.RANGE => + val rangeInfo = new util.ArrayList(partitionInfo.getRangeInfo) + val rangeToRemove = partitionInfo.getRangeInfo.get(partitionIndex - 1) + rangeInfo.remove(rangeToRemove) + partitionInfo.setRangeInfo(rangeInfo) + case PartitionType.LIST => + val listInfo = new util.ArrayList(partitionInfo.getListInfo) + val listToRemove = partitionInfo.getListInfo.get(partitionIndex - 1) + listInfo.remove(listToRemove) + partitionInfo.setListInfo(listInfo) + case PartitionType.RANGE_INTERVAL => + sys.error(s"Dropping range interval partition isn't support yet!") + } + partitionInfo.dropPartition(partitionIndex) + val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier) + val schemaFilePath = carbonTablePath.getSchemaFilePath + // read TableInfo + val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession) + + val schemaConverter = new ThriftWrapperSchemaConverterImpl() + val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, + dbName, tableName, storePath) + val tableSchema = wrapperTableInfo.getFactTable + tableSchema.setPartitionInfo(partitionInfo) + wrapperTableInfo.setFactTable(tableSchema) + wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis()) + val thriftTable = + schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName) + thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) + .setTime_stamp(System.currentTimeMillis) + carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable, + dbName, tableName, storePath) + CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable) + // update the schema modified time + carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath) + // sparkSession.catalog.refreshTable(tableName) + Seq.empty + } + + override def processData(sparkSession: SparkSession): Seq[Row] = { + var locks = List.empty[ICarbonLock] + var success = false + try { + locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName, + locksToBeAcquired)(sparkSession) + val carbonLoadModel = new CarbonLoadModel() + val dataLoadSchema = new CarbonDataLoadSchema(table) + // Need to fill dimension relation + carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema) + carbonLoadModel.setTableName(carbonTableIdentifier.getTableName) + carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName) + carbonLoadModel.setStorePath(storePath) + val loadStartTime = CarbonUpdateUtil.readCurrentTime + carbonLoadModel.setFactTimeStamp(loadStartTime) + CarbonDataRDDFactory.alterTableDropPartition(sparkSession.sqlContext, + partitionId, + carbonLoadModel, + dropWithData, + oldPartitionIds.asScala.toList + ) + success = true + } catch { + case e: Exception => + sys.error(s"Drop Partition failed. Please check logs for more info. ${ e.getMessage } ") + success = false + } finally { + CacheProvider.getInstance().dropAllCache() + AlterTableUtil.releaseLocks(locks) + LOGGER.info("Locks released after alter table drop partition action.") + LOGGER.audit("Locks released after alter table drop partition action.") + } + LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName") + LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName") + Seq.empty + } +} + + case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends RunnableCommand with SchemaProcessCommand { def run(sparkSession: SparkSession): Seq[Row] = { @@ -796,7 +931,7 @@ case class LoadTable( LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress") } if (null == carbonLoadModel.getLoadMetadataDetails) { - CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath) + CommonUtil.readLoadMetadataDetails(carbonLoadModel) } if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass && StringUtils.isEmpty(column_dict) && StringUtils.isEmpty(all_dictionary_path)) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index 1d74bee..24b2981 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -72,7 +72,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { alterTableModifyDataType | alterTableDropColumn | alterTableAddColumns protected lazy val alterPartition: Parser[LogicalPlan] = - alterAddPartition | alterSplitPartition + alterAddPartition | alterSplitPartition | alterDropPartition protected lazy val alterAddPartition: Parser[LogicalPlan] = ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~> @@ -95,6 +95,20 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { AlterTableSplitPartitionCommand(alterTableSplitPartitionModel) } + protected lazy val alterDropPartition: Parser[LogicalPlan] = + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (DROP ~> PARTITION ~> + "(" ~> numericLit <~ ")") ~ (WITH ~> DATA).? <~ opt(";") ^^ { + case dbName ~ table ~ partitionId ~ withData => + val dropWithData = withData.getOrElse("NO") match { + case "NO" => false + case _ => true + } + val alterTableDropPartitionModel = + AlterTableDropPartitionModel(dbName, table, partitionId, dropWithData) + AlterTableDropPartition(alterTableDropPartitionModel) + } + + protected lazy val alterTable: Parser[LogicalPlan] = ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) <~ opt(";") ^^ { case dbName ~ table ~ (compact ~ compactType) =>