[CARBONDATA-1316] Support drop partition function

This closes #1317


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cb51b862
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cb51b862
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cb51b862

Branch: refs/heads/streaming_ingest
Commit: cb51b86218cd815167f7c702b643ed0852c7f3dc
Parents: fe36e3b
Author: lionelcao <whuca...@gmail.com>
Authored: Mon Sep 4 15:38:44 2017 +0800
Committer: QiangCai <qiang...@qq.com>
Committed: Mon Sep 18 17:19:22 2017 +0800

----------------------------------------------------------------------
 .../core/metadata/schema/PartitionInfo.java     |   5 +
 .../hadoop/api/CarbonTableInputFormat.java      |  23 ++-
 .../spark/partition/DropPartitionCallable.java  |  39 +++++
 .../org/apache/carbondata/spark/KeyVal.scala    |   4 +-
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  | 141 +++++++++++++++
 .../spark/rdd/AlterTableSplitPartitionRDD.scala | 146 ----------------
 .../spark/rdd/CarbonScanPartitionRDD.scala      |  29 ++--
 .../apache/carbondata/spark/rdd/Compactor.scala |   3 +-
 .../spark/rdd/DataManagementFunc.scala          |  50 +++---
 .../carbondata/spark/rdd/PartitionDropper.scala | 122 +++++++++++++
 .../spark/rdd/PartitionSplitter.scala           |  36 ++--
 .../carbondata/spark/util/CommonUtil.scala      |   2 +-
 .../spark/util/GlobalDictionaryUtil.scala       |   3 +-
 .../command/carbonTableSchemaCommon.scala       |  25 ++-
 .../org/apache/spark/util/PartitionUtils.scala  |  15 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   8 +-
 .../execution/command/carbonTableSchema.scala   |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 103 ++++++++---
 .../execution/command/carbonTableSchema.scala   | 145 +++++++++++++++-
 .../sql/parser/CarbonSpark2SqlParser.scala      |  16 +-
 .../partition/TestAlterPartitionTable.scala     | 171 +++++++++++++++----
 .../processing/merger/CarbonDataMergerUtil.java |   5 +-
 .../processing/spliter/RowResultProcessor.java  | 105 ++++++++++++
 .../spliter/RowResultSpliterProcessor.java      | 105 ------------
 .../exception/AlterPartitionSliceException.java |  78 +++++++++
 .../exception/SliceSpliterException.java        |  78 ---------
 26 files changed, 978 insertions(+), 481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
index 4b0bc3e..d0c4447 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/schema/PartitionInfo.java
@@ -92,6 +92,11 @@ public class PartitionInfo implements Serializable {
     numPartitions = numPartitions - 1 + newPartitionNumbers;
   }
 
+  public void dropPartition(int index) {
+    partitionIds.remove(index);
+    numPartitions--;
+  }
+
   public List<ColumnSchema> getColumnSchemaList() {
     return columnSchemaList;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index dcc75bd..9076233 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -306,7 +306,7 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     // prune partitions for filter query on partition table
     BitSet matchedPartitions = null;
     if (partitionInfo != null) {
-      matchedPartitions = setMatchedPartitions(null, filter, partitionInfo);
+      matchedPartitions = setMatchedPartitions(null, filter, partitionInfo, 
null);
       if (matchedPartitions != null) {
         if (matchedPartitions.cardinality() == 0) {
           return new ArrayList<InputSplit>();
@@ -366,9 +366,11 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
       TableProvider tableProvider = new SingleTableProvider(carbonTable);
       // prune partitions for filter query on partition table
       String partitionIds = job.getConfiguration().get(ALTER_PARTITION_ID);
+      // matchedPartitions records partitionIndex, not partitionId
       BitSet matchedPartitions = null;
       if (partitionInfo != null) {
-        matchedPartitions = setMatchedPartitions(partitionIds, filter, 
partitionInfo);
+        matchedPartitions =
+            setMatchedPartitions(partitionIds, filter, partitionInfo, 
oldPartitionIdList);
         if (matchedPartitions != null) {
           if (matchedPartitions.cardinality() == 0) {
             return new ArrayList<InputSplit>();
@@ -396,15 +398,24 @@ public class CarbonTableInputFormat<T> extends 
FileInputFormat<Void, T> {
     }
   }
 
+  /**
+   * set the matched partition indices into a BitSet
+   * @param partitionIds  from alter table command, for normal query, it's null
+   * @param filter   from query
+   * @param partitionInfo
+   * @param oldPartitionIdList  only used in alter table command
+   * @return
+   */
   private BitSet setMatchedPartitions(String partitionIds, Expression filter,
-      PartitionInfo partitionInfo) {
+      PartitionInfo partitionInfo, List<Integer> oldPartitionIdList) {
     BitSet matchedPartitions = null;
     if (null != partitionIds) {
       String[] partList = partitionIds.replace("[", "").replace("]", 
"").split(",");
-      // only one partitionId in current alter table statement
-      matchedPartitions = new BitSet(Integer.parseInt(partList[0]));
+      // partList[0] -> use the first element to initiate BitSet, will auto 
expand later
+      matchedPartitions = new BitSet(Integer.parseInt(partList[0].trim()));
       for (String partitionId : partList) {
-        matchedPartitions.set(Integer.parseInt(partitionId));
+        Integer index = 
oldPartitionIdList.indexOf(Integer.parseInt(partitionId.trim()));
+        matchedPartitions.set(index);
       }
     } else {
       if (null != filter) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/DropPartitionCallable.java
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/DropPartitionCallable.java
 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/DropPartitionCallable.java
new file mode 100644
index 0000000..ce66aac
--- /dev/null
+++ 
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/DropPartitionCallable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.partition;
+
+import java.util.concurrent.Callable;
+
+import org.apache.carbondata.spark.rdd.PartitionDropper;
+
+import org.apache.spark.sql.execution.command.DropPartitionCallableModel;
+
+public class DropPartitionCallable implements Callable<Void> {
+
+  private DropPartitionCallableModel dropPartitionCallableModel;
+
+  public DropPartitionCallable(DropPartitionCallableModel 
dropPartitionCallableModel) {
+    this.dropPartitionCallableModel = dropPartitionCallableModel;
+  }
+
+  @Override public Void call() {
+    PartitionDropper.triggerPartitionDrop(dropPartitionCallableModel);
+    return null;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index 181f6e4..7cf8c88 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -107,11 +107,11 @@ class MergeResultImpl extends MergeResult[String, 
Boolean] {
   override def getKey(key: String, value: Boolean): (String, Boolean) = (key, 
value)
 }
 
-trait SplitResult[K, V] extends Serializable {
+trait AlterPartitionResult[K, V] extends Serializable {
   def getKey(key: String, value: Boolean): (K, V)
 }
 
-class SplitResultImpl extends SplitResult[String, Boolean] {
+class AlterPartitionResultImpl extends AlterPartitionResult[String, Boolean] {
   override def getKey(key: String, value: Boolean): (String, Boolean) = (key, 
value)
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
new file mode 100644
index 0000000..6cf8a7a
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+import scala.util.Random
+
+import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.AlterPartitionModel
+import org.apache.spark.util.PartitionUtils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.processing.spliter.RowResultProcessor
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.AlterPartitionResult
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
+
+class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: 
AlterPartitionModel,
+    result: AlterPartitionResult[K, V],
+    partitionIds: Seq[String],
+    bucketId: Int,
+    identifier: AbsoluteTableIdentifier,
+    prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) {
+
+    var storeLocation: String = null
+    val carbonLoadModel = alterPartitionModel.carbonLoadModel
+    val segmentId = alterPartitionModel.segmentId
+    val oldPartitionIds = alterPartitionModel.oldPartitionIds
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val databaseName = carbonTable.getDatabaseName
+    val factTableName = carbonTable.getFactTableName
+    val partitionInfo = carbonTable.getPartitionInfo(factTableName)
+
+    override protected def getPartitions: Array[Partition] = {
+        val sc = alterPartitionModel.sqlContext.sparkContext
+        sc.setLocalProperty("spark.scheduler.pool", "DDL")
+        sc.setLocalProperty("spark.job.interruptOnCancel", "true")
+        firstParent[Array[AnyRef]].partitions
+    }
+
+    override def compute(split: Partition, context: TaskContext): Iterator[(K, 
V)] = {
+        val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+        val rows = firstParent[Array[AnyRef]].iterator(split, 
context).toList.asJava
+        val iter = new Iterator[(K, V)] {
+            val partitionId = partitionInfo.getPartitionId(split.index)
+            carbonLoadModel.setTaskNo(String.valueOf(partitionId))
+            carbonLoadModel.setSegmentId(segmentId)
+            carbonLoadModel.setPartitionId("0")
+            val tempLocationKey = CarbonDataProcessorUtil
+              .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
+                  carbonLoadModel.getTableName,
+                  segmentId,
+                  carbonLoadModel.getTaskNo,
+                  false,
+                  true)
+            // this property is used to determine whether temp location for 
carbon is inside
+            // container temp dir or is yarn application directory.
+            val carbonUseLocalDir = CarbonProperties.getInstance()
+              .getProperty("carbon.use.local.dir", "false")
+
+            if (carbonUseLocalDir.equalsIgnoreCase("true")) {
+
+                val storeLocations = 
CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
+                if (null != storeLocations && storeLocations.nonEmpty) {
+                    storeLocation = 
storeLocations(Random.nextInt(storeLocations.length))
+                }
+                if (storeLocation == null) {
+                    storeLocation = System.getProperty("java.io.tmpdir")
+                }
+            } else {
+                storeLocation = System.getProperty("java.io.tmpdir")
+            }
+            storeLocation = storeLocation + '/' + System.nanoTime() + '/' + 
split.index
+            CarbonProperties.getInstance().addProperty(tempLocationKey, 
storeLocation)
+            LOGGER.info(s"Temp storeLocation taken is $storeLocation")
+
+            val tempStoreLoc = 
CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
+                factTableName,
+                carbonLoadModel.getTaskNo,
+                "0",
+                segmentId,
+                false,
+                true
+            )
+
+            val loadStatus = if (rows.isEmpty) {
+                LOGGER.info("After repartition this split, NO target rows to 
write back.")
+                true
+            } else {
+                val segmentProperties = 
PartitionUtils.getSegmentProperties(identifier,
+                    segmentId, partitionIds.toList, oldPartitionIds, 
partitionInfo)
+                val processor = new RowResultProcessor(
+                    carbonTable,
+                    carbonLoadModel,
+                    segmentProperties,
+                    tempStoreLoc,
+                    bucketId)
+                try {
+                    processor.execute(rows)
+                } catch {
+                    case e: Exception =>
+                        sys.error(s"Exception when executing Row result 
processor ${e.getMessage}")
+                } finally {
+                    CarbonLoaderUtil
+                      .deleteLocalDataLoadFolderLocation(carbonLoadModel, 
false, true)
+                }
+            }
+
+            val loadResult = segmentId
+            var finished = false
+
+            override def hasNext: Boolean = {
+                !finished
+            }
+
+            override def next(): (K, V) = {
+                finished = true
+                result.getKey(loadResult, loadStatus)
+            }
+        }
+        iter
+    }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala
deleted file mode 100644
index e481fc4..0000000
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableSplitPartitionRDD.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-import scala.util.Random
-
-import org.apache.spark.{Partition, SparkContext, SparkEnv, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.util.PartitionUtils
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.PartitionInfo
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.spliter.RowResultSpliterProcessor
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.spark.SplitResult
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-
-class AlterTableSplitPartitionRDD[K, V](
-    sc: SparkContext,
-    result: SplitResult[K, V],
-    partitionIds: Seq[String],
-    segmentId: String,
-    bucketId: Int,
-    carbonLoadModel: CarbonLoadModel,
-    identifier: AbsoluteTableIdentifier,
-    storePath: String,
-    oldPartitionIdList: List[Int],
-    prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) {
-
-    sc.setLocalProperty("spark.scheduler.pool", "DDL")
-    sc.setLocalProperty("spark.job.interruptOnCancel", "true")
-
-    var storeLocation: String = null
-    var splitResult: String = null
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val databaseName = carbonTable.getDatabaseName
-    val factTableName = carbonTable.getFactTableName
-    val partitionInfo = carbonTable.getPartitionInfo(factTableName)
-
-    override protected def getPartitions: Array[Partition] = 
firstParent[Array[AnyRef]].partitions
-
-    override def compute(split: Partition, context: TaskContext): Iterator[(K, 
V)] = {
-        val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-        val rows = firstParent[Array[AnyRef]].iterator(split, 
context).toList.asJava
-        val iter = new Iterator[(K, V)] {
-            val partitionId = partitionInfo.getPartitionId(split.index)
-            carbonLoadModel.setTaskNo(String.valueOf(partitionId))
-            carbonLoadModel.setSegmentId(segmentId)
-            carbonLoadModel.setPartitionId("0")
-            val tempLocationKey = CarbonDataProcessorUtil
-              .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
-                  carbonLoadModel.getTableName,
-                  segmentId,
-                  carbonLoadModel.getTaskNo,
-                  false,
-                  true)
-            // this property is used to determine whether temp location for 
carbon is inside
-            // container temp dir or is yarn application directory.
-            val carbonUseLocalDir = CarbonProperties.getInstance()
-              .getProperty("carbon.use.local.dir", "false")
-
-            if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-
-                val storeLocations = 
CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
-                if (null != storeLocations && storeLocations.nonEmpty) {
-                    storeLocation = 
storeLocations(Random.nextInt(storeLocations.length))
-                }
-                if (storeLocation == null) {
-                    storeLocation = System.getProperty("java.io.tmpdir")
-                }
-            } else {
-                storeLocation = System.getProperty("java.io.tmpdir")
-            }
-            storeLocation = storeLocation + '/' + System.nanoTime() + '/' + 
split.index
-            CarbonProperties.getInstance().addProperty(tempLocationKey, 
storeLocation)
-            LOGGER.info(s"Temp storeLocation taken is $storeLocation")
-
-            val tempStoreLoc = 
CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
-                factTableName,
-                carbonLoadModel.getTaskNo,
-                "0",
-                segmentId,
-                false,
-                true
-            )
-
-            val splitStatus = if (rows.isEmpty) {
-                LOGGER.info("After repartition this split, NO target rows to 
write back.")
-                true
-            } else {
-                try {
-                    val segmentProperties = 
PartitionUtils.getSegmentProperties(identifier,
-                        segmentId, partitionIds.toList, oldPartitionIdList, 
partitionInfo)
-                    val processor = new RowResultSpliterProcessor(
-                        carbonTable,
-                        carbonLoadModel,
-                        segmentProperties,
-                        tempStoreLoc,
-                        bucketId
-                    )
-                    processor.execute(rows)
-                } catch {
-                    case e: Exception =>
-                        sys.error(s"Exception when executing Row result 
processor ${e.getMessage}")
-                } finally {
-                    CarbonLoaderUtil
-                      .deleteLocalDataLoadFolderLocation(carbonLoadModel, 
false, true)
-                }
-
-            }
-
-            val splitResult = segmentId
-            var finished = false
-
-            override def hasNext: Boolean = {
-                !finished
-            }
-
-            override def next(): (K, V) = {
-                finished = true
-                result.getKey(splitResult, splitStatus)
-            }
-        }
-        iter
-    }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
index 2a39db5..86bc79f 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.AlterPartitionModel
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.PartitionUtils
@@ -53,27 +54,23 @@ import org.apache.carbondata.spark.load.CarbonLoaderUtil
 /**
  * This RDD is used in alter table partition statement to get data of target 
partitions,
  * then repartition data according to new partitionInfo
- * @param sc
+ * @param alterPartitionModel
+ * @param carbonTableIdentifier
  * @param partitionIds  the ids of target partition to be scanned
- * @param storePath
- * @param segmentId
  * @param bucketId
- * @param oldPartitionIdList  the taskId in partition order before 
partitionInfo is modified
- * @param carbonTableIdentifier
- * @param carbonLoadModel
  */
-class CarbonScanPartitionRDD(
-    sc: SparkContext,
-    partitionIds: Seq[String],
-    storePath: String,
-    segmentId: String,
-    bucketId: Int,
-    oldPartitionIdList: List[Int],
+class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
     carbonTableIdentifier: CarbonTableIdentifier,
-    carbonLoadModel: CarbonLoadModel)
-  extends RDD[(AnyRef, Array[AnyRef])](sc, Nil) {
+    partitionIds: Seq[String],
+    bucketId: Int)
+  extends RDD[(AnyRef, 
Array[AnyRef])](alterPartitionModel.sqlContext.sparkContext, Nil) {
 
-  private val queryId = sc.getConf.get("queryId", System.nanoTime() + "")
+  private val queryId = alterPartitionModel.sqlContext.sparkContext.getConf
+    .get("queryId", System.nanoTime() + "")
+  val segmentId = alterPartitionModel.segmentId
+  val carbonLoadModel = alterPartitionModel.carbonLoadModel
+  val oldPartitionIdList = alterPartitionModel.oldPartitionIds
+  val storePath = carbonLoadModel.getStorePath
   val identifier = new AbsoluteTableIdentifier(storePath, 
carbonTableIdentifier)
   var storeLocation: String = null
   var splitStatus: Boolean = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index c13a942..fb610c1 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -36,14 +36,13 @@ object Compactor {
 
   def triggerCompaction(compactionCallableModel: CompactionCallableModel): 
Unit = {
 
-    val storePath = compactionCallableModel.storePath
     val storeLocation = compactionCallableModel.storeLocation
     val carbonTable = compactionCallableModel.carbonTable
     val loadsToMerge = compactionCallableModel.loadsToMerge
     val sc = compactionCallableModel.sqlContext
     val carbonLoadModel = compactionCallableModel.carbonLoadModel
     val compactionType = compactionCallableModel.compactionType
-
+    val storePath = carbonLoadModel.getStorePath
     val startTime = System.nanoTime()
     val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
     var finalMergeStatus = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index bca119e..c2b7b74 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.command.{CompactionCallableModel, 
CompactionModel, SplitPartitionCallableModel}
+import org.apache.spark.sql.execution.command.{CompactionCallableModel, 
CompactionModel, DropPartitionCallableModel, SplitPartitionCallableModel}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -39,7 +39,7 @@ import 
org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadM
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.compaction.CompactionCallable
 import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.partition.SplitPartitionCallable
+import org.apache.carbondata.spark.partition.{DropPartitionCallable, 
SplitPartitionCallable}
 import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil}
 
 /**
@@ -149,7 +149,6 @@ object DataManagementFunc {
   }
 
   def executeCompaction(carbonLoadModel: CarbonLoadModel,
-      storePath: String,
       compactionModel: CompactionModel,
       executor: ExecutorService,
       sqlContext: SQLContext,
@@ -161,7 +160,6 @@ object DataManagementFunc {
 
     var segList = carbonLoadModel.getLoadMetadataDetails
     var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-      storePath,
       carbonLoadModel,
       compactionModel.compactionSize,
       segList,
@@ -180,7 +178,6 @@ object DataManagementFunc {
       scanSegmentsAndSubmitJob(futureList,
         loadsToMerge,
         executor,
-        storePath,
         sqlContext,
         compactionModel,
         carbonLoadModel,
@@ -200,7 +197,7 @@ object DataManagementFunc {
       }
 
       // scan again and determine if anything is there to merge again.
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
       segList = carbonLoadModel.getLoadMetadataDetails
       // in case of major compaction we will scan only once and come out as it 
will keep
       // on doing major for the new loads also.
@@ -215,7 +212,6 @@ object DataManagementFunc {
         loadsToMerge.clear()
       } else if (segList.size > 0) {
         loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-          storePath,
           carbonLoadModel,
           compactionModel.compactionSize,
           segList,
@@ -234,10 +230,8 @@ object DataManagementFunc {
    * @param futureList
    */
   private def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
-      loadsToMerge: util
-      .List[LoadMetadataDetails],
+      loadsToMerge: util.List[LoadMetadataDetails],
       executor: ExecutorService,
-      storePath: String,
       sqlContext: SQLContext,
       compactionModel: CompactionModel,
       carbonLoadModel: CarbonLoadModel,
@@ -248,8 +242,7 @@ object DataManagementFunc {
     }
     )
 
-    val compactionCallableModel = CompactionCallableModel(storePath,
-      carbonLoadModel,
+    val compactionCallableModel = CompactionCallableModel(carbonLoadModel,
       storeLocation,
       compactionModel.carbonTable,
       loadsToMerge,
@@ -264,14 +257,13 @@ object DataManagementFunc {
   def executePartitionSplit( sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       executor: ExecutorService,
-      storePath: String,
       segment: String,
       partitionId: String,
       oldPartitionIdList: List[Int]): Unit = {
     val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
       CarbonCommonConstants.DEFAULT_COLLECTION_SIZE
     )
-    scanSegmentsForSplitPartition(futureList, executor, storePath, segment, 
partitionId,
+    scanSegmentsForSplitPartition(futureList, executor, segment, partitionId,
       sqlContext, carbonLoadModel, oldPartitionIdList)
     try {
         futureList.asScala.foreach(future => {
@@ -287,15 +279,13 @@ object DataManagementFunc {
 
   private def scanSegmentsForSplitPartition(futureList: 
util.List[Future[Void]],
       executor: ExecutorService,
-      storePath: String,
       segmentId: String,
       partitionId: String,
       sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       oldPartitionIdList: List[Int]): Unit = {
 
-    val splitModel = SplitPartitionCallableModel(storePath,
-      carbonLoadModel,
+    val splitModel = SplitPartitionCallableModel(carbonLoadModel,
       segmentId,
       partitionId,
       oldPartitionIdList,
@@ -305,9 +295,27 @@ object DataManagementFunc {
     futureList.add(future)
   }
 
-  def prepareCarbonLoadModel(storePath: String,
-      table: CarbonTable,
-      newCarbonLoadModel: CarbonLoadModel): Unit = {
+  def executeDroppingPartition(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      executor: ExecutorService,
+      segmentId: String,
+      partitionId: String,
+      dropWithData: Boolean,
+      oldPartitionIds: List[Int]): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val model = new DropPartitionCallableModel(carbonLoadModel,
+      segmentId, partitionId, oldPartitionIds, dropWithData, carbonTable, 
sqlContext)
+    val future: Future[Void] = executor.submit(new 
DropPartitionCallable(model))
+    try {
+        future.get
+    } catch {
+      case e: Exception =>
+        LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage 
}")
+        throw e
+    }
+  }
+
+  def prepareCarbonLoadModel(table: CarbonTable, newCarbonLoadModel: 
CarbonLoadModel): Unit = {
     newCarbonLoadModel.setTableName(table.getFactTableName)
     val dataLoadSchema = new CarbonDataLoadSchema(table)
     // Need to fill dimension relation
@@ -315,7 +323,7 @@ object DataManagementFunc {
     
newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
     
newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
     newCarbonLoadModel.setStorePath(table.getStorePath)
-    CommonUtil.readLoadMetadataDetails(newCarbonLoadModel, storePath)
+    CommonUtil.readLoadMetadataDetails(newCarbonLoadModel)
     val loadStartTime = CarbonUpdateUtil.readCurrentTime();
     newCarbonLoadModel.setFactTimeStamp(loadStartTime)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
new file mode 100644
index 0000000..0a41f44
--- /dev/null
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.io.IOException
+
+import org.apache.spark.sql.execution.command.{AlterPartitionModel, 
DropPartitionCallableModel}
+import org.apache.spark.util.PartitionUtils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.spark.{AlterPartitionResultImpl, PartitionFactory}
+
+object PartitionDropper {
+
+  val logger = 
LogServiceFactory.getLogService(PartitionDropper.getClass.getName)
+
+  def triggerPartitionDrop(dropPartitionCallableModel: 
DropPartitionCallableModel): Unit = {
+    val alterPartitionModel = new 
AlterPartitionModel(dropPartitionCallableModel.carbonLoadModel,
+      dropPartitionCallableModel.segmentId,
+      dropPartitionCallableModel.oldPartitionIds,
+      dropPartitionCallableModel.sqlContext
+    )
+    val partitionId = dropPartitionCallableModel.partitionId
+    val oldPartitionIds = dropPartitionCallableModel.oldPartitionIds
+    val dropWithData = dropPartitionCallableModel.dropWithData
+    val carbonTable = dropPartitionCallableModel.carbonTable
+    val dbName = carbonTable.getDatabaseName
+    val tableName = carbonTable.getFactTableName
+    val identifier = carbonTable.getAbsoluteTableIdentifier
+    val carbonTableIdentifier = identifier.getCarbonTableIdentifier
+    val partitionInfo = carbonTable.getPartitionInfo(tableName)
+    val partitioner = PartitionFactory.getPartitioner(partitionInfo)
+
+    var finalDropStatus = false
+    val bucketInfo = carbonTable.getBucketingInfo(tableName)
+    val bucketNumber = bucketInfo match {
+      case null => 1
+      case _ => bucketInfo.getNumberOfBuckets
+    }
+    val partitionIndex = oldPartitionIds.indexOf(Integer.valueOf(partitionId))
+    val targetPartitionId = partitionInfo.getPartitionType match {
+      case PartitionType.RANGE => if (partitionIndex == oldPartitionIds.length 
- 1) {
+        "0"
+      } else {
+        String.valueOf(oldPartitionIds(partitionIndex + 1))
+      }
+      case PartitionType.LIST => "0"
+    }
+
+    if (!dropWithData) {
+      try {
+        for (i <- 0 until bucketNumber) {
+          val bucketId = i
+          val rdd = new CarbonScanPartitionRDD(alterPartitionModel,
+            carbonTableIdentifier,
+            Seq(partitionId, targetPartitionId),
+            bucketId
+          ).partitionBy(partitioner).map(_._2)
+
+          val dropStatus = new AlterTableLoadPartitionRDD(alterPartitionModel,
+            new AlterPartitionResultImpl(),
+            Seq(partitionId),
+            bucketId,
+            identifier,
+            rdd).collect()
+
+          if (dropStatus.length == 0) {
+            finalDropStatus = false
+          } else {
+            finalDropStatus = dropStatus.forall(_._2)
+          }
+          if (!finalDropStatus) {
+            logger.audit(s"Drop Partition request failed for table " +
+                         s"${ dbName }.${ tableName }")
+            logger.error(s"Drop Partition request failed for table " +
+                         s"${ dbName }.${ tableName }")
+          }
+        }
+
+        if (finalDropStatus) {
+          try {
+            PartitionUtils.deleteOriginalCarbonFile(alterPartitionModel, 
identifier,
+              Seq(partitionId, targetPartitionId).toList, dbName,
+              tableName, partitionInfo)
+          } catch {
+            case e: IOException => sys.error(s"Exception while delete original 
carbon files " +
+                                             e.getMessage)
+          }
+          logger.audit(s"Drop Partition request completed for table " +
+                       s"${ dbName }.${ tableName }")
+          logger.info(s"Drop Partition request completed for table " +
+                      s"${ dbName }.${ tableName }")
+        }
+      } catch {
+        case e: Exception => sys.error(s"Exception in dropping partition 
action: ${ e.getMessage }")
+      }
+    } else {
+      PartitionUtils.deleteOriginalCarbonFile(alterPartitionModel, identifier,
+        Seq(partitionId).toList, dbName, tableName, partitionInfo)
+      logger.audit(s"Drop Partition request completed for table " +
+                   s"${ dbName }.${ tableName }")
+      logger.info(s"Drop Partition request completed for table " +
+                  s"${ dbName }.${ tableName }")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
index 48e1bee..fca7542 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala
@@ -19,22 +19,24 @@ package org.apache.carbondata.spark.rdd
 
 import java.io.IOException
 
-import org.apache.spark.sql.execution.command.SplitPartitionCallableModel
+import org.apache.spark.sql.execution.command.{AlterPartitionModel, 
SplitPartitionCallableModel}
 import org.apache.spark.util.PartitionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.spark.{PartitionFactory, SplitResultImpl}
+import org.apache.carbondata.spark.{AlterPartitionResultImpl, PartitionFactory}
 
 object PartitionSplitter {
 
   val logger = 
LogServiceFactory.getLogService(PartitionSplitter.getClass.getName)
 
   def triggerPartitionSplit(splitPartitionCallableModel: 
SplitPartitionCallableModel): Unit = {
-     val sc = splitPartitionCallableModel.sqlContext.sparkContext
+
+     val alterPartitionModel = new 
AlterPartitionModel(splitPartitionCallableModel.carbonLoadModel,
+       splitPartitionCallableModel.segmentId,
+       splitPartitionCallableModel.oldPartitionIds,
+       splitPartitionCallableModel.sqlContext
+     )
      val partitionId = splitPartitionCallableModel.partitionId
-     val storePath = splitPartitionCallableModel.storePath
-     val segmentId = splitPartitionCallableModel.segmentId
-     val oldPartitionIdList = splitPartitionCallableModel.oldPartitionIdList
      val carbonLoadModel = splitPartitionCallableModel.carbonLoadModel
      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
      val identifier = carbonTable.getAbsoluteTableIdentifier
@@ -53,25 +55,17 @@ object PartitionSplitter {
      for (i <- 0 until bucketNumber) {
        val bucketId = i
        val rdd = new CarbonScanPartitionRDD(
-         sc,
-         Seq(partitionId),
-         storePath,
-         segmentId,
-         bucketId,
-         oldPartitionIdList,
+         alterPartitionModel,
          carbonTableIdentifier,
-         carbonLoadModel
+         Seq(partitionId),
+         bucketId
        ).partitionBy(partitioner).map(_._2)
 
-       val splitStatus = new AlterTableSplitPartitionRDD(sc,
-         new SplitResultImpl(),
+       val splitStatus = new AlterTableLoadPartitionRDD(alterPartitionModel,
+         new AlterPartitionResultImpl(),
          Seq(partitionId),
-         segmentId,
          bucketId,
-         carbonLoadModel,
          identifier,
-         storePath,
-         oldPartitionIdList,
          rdd).collect()
 
        if (splitStatus.length == 0) {
@@ -89,8 +83,8 @@ object PartitionSplitter {
      if (finalSplitStatus) {
        try {
          PartitionUtils.
-           deleteOriginalCarbonFile(identifier, segmentId, 
Seq(partitionId).toList,
-             oldPartitionIdList, storePath, databaseName, tableName, 
partitionInfo, carbonLoadModel)
+           deleteOriginalCarbonFile(alterPartitionModel, identifier, 
Seq(partitionId).toList
+             , databaseName, tableName, partitionInfo)
        } catch {
          case e: IOException => sys.error(s"Exception while delete original 
carbon files " +
          e.getMessage)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index fd265a8..f123624 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -537,7 +537,7 @@ object CommonUtil {
     }
   }
 
-  def readLoadMetadataDetails(model: CarbonLoadModel, storePath: String): Unit 
= {
+  def readLoadMetadataDetails(model: CarbonLoadModel): Unit = {
     val metadataPath = 
model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
     val details = SegmentStatusManager.readLoadMetadata(metadataPath)
     model.setLoadMetadataDetails(details.toList.asJava)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 47eaece..601c0c7 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -314,7 +314,6 @@ object GlobalDictionaryUtil {
         isComplexes += dimensions(i).isComplex
       }
     }
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, 
table)
     val primDimensions = primDimensionsBuffer.map { x => x }.toArray
     val dictDetail = CarbonSparkFactory.getDictionaryDetailService.
       getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
@@ -330,7 +329,7 @@ object GlobalDictionaryUtil {
       
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 
2)(1)
     // get load count
     if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel, hdfsLocation)
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
     }
     DictionaryLoadModel(table,
       dimensions,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index cc2cc82..f5d69ef 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -118,19 +118,31 @@ case class CompactionModel(compactionSize: Long,
     carbonTable: CarbonTable,
     isDDLTrigger: Boolean)
 
-case class CompactionCallableModel(storePath: String,
-    carbonLoadModel: CarbonLoadModel,
+case class CompactionCallableModel(carbonLoadModel: CarbonLoadModel,
     storeLocation: String,
     carbonTable: CarbonTable,
     loadsToMerge: util.List[LoadMetadataDetails],
     sqlContext: SQLContext,
     compactionType: CompactionType)
 
-case class SplitPartitionCallableModel(storePath: String,
-    carbonLoadModel: CarbonLoadModel,
+case class AlterPartitionModel(carbonLoadModel: CarbonLoadModel,
+    segmentId: String,
+    oldPartitionIds: List[Int],
+    sqlContext: SQLContext
+)
+
+case class SplitPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
     segmentId: String,
     partitionId: String,
-    oldPartitionIdList: List[Int],
+    oldPartitionIds: List[Int],
+    sqlContext: SQLContext)
+
+case class DropPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
+    segmentId: String,
+    partitionId: String,
+    oldPartitionIds: List[Int],
+    dropWithData: Boolean,
+    carbonTable: CarbonTable,
     sqlContext: SQLContext)
 
 case class DataTypeInfo(dataType: String, precision: Int = 0, scale: Int = 0)
@@ -160,7 +172,8 @@ case class AlterTableDropColumnModel(databaseName: 
Option[String],
 
 case class AlterTableDropPartitionModel(databaseName: Option[String],
     tableName: String,
-    partitionId: String)
+    partitionId: String,
+    dropWithData: Boolean)
 
 case class AlterTableSplitPartitionModel(databaseName: Option[String],
     tableName: String,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 3982f7b..002ed27 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable.ListBuffer
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.sql.execution.command.AlterPartitionModel
 
 import org.apache.carbondata.core.datastore.block.{SegmentProperties, 
TableBlockInfo}
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -151,13 +152,17 @@ object PartitionUtils {
   }
 
   @throws(classOf[IOException])
-  def deleteOriginalCarbonFile(identifier: AbsoluteTableIdentifier, segmentId: 
String,
-      partitionIds: List[String], oldPartitionIdList: List[Int], storePath: 
String,
-      dbName: String, tableName: String, partitionInfo: PartitionInfo,
-      carbonLoadModel: CarbonLoadModel): Unit = {
+  def deleteOriginalCarbonFile(alterPartitionModel: AlterPartitionModel,
+      identifier: AbsoluteTableIdentifier,
+      partitionIds: List[String], dbName: String, tableName: String,
+      partitionInfo: PartitionInfo): Unit = {
+    val carbonLoadModel = alterPartitionModel.carbonLoadModel
+    val segmentId = alterPartitionModel.segmentId
+    val oldPartitionIds = alterPartitionModel.oldPartitionIds
     val newTime = carbonLoadModel.getFactTimeStamp
+    val storePath = carbonLoadModel.getStorePath
     val tableBlockInfoList =
-      getPartitionBlockList(identifier, segmentId, partitionIds, 
oldPartitionIdList,
+      getPartitionBlockList(identifier, segmentId, partitionIds, 
oldPartitionIds,
         partitionInfo).asScala
     val pathList: util.List[String] = new util.ArrayList[String]()
     val carbonTablePath = new CarbonTablePath(storePath, dbName, tableName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index ef2a917..596cebf 100644
--- 
a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -104,7 +104,7 @@ object CarbonDataRDDFactory {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
     }
     // reading the start time of data load.
     val loadStartTime = CarbonUpdateUtil.readCurrentTime();
@@ -228,7 +228,7 @@ object CarbonDataRDDFactory {
       compactionLock: ICarbonLock): Unit = {
     val executor: ExecutorService = Executors.newFixedThreadPool(1)
     // update the updated table status.
-    CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+    CommonUtil.readLoadMetadataDetails(carbonLoadModel)
     val compactionThread = new Thread {
       override def run(): Unit = {
 
@@ -238,7 +238,6 @@ object CarbonDataRDDFactory {
           var exception: Exception = null
           try {
             DataManagementFunc.executeCompaction(carbonLoadModel: 
CarbonLoadModel,
-              storePath: String,
               compactionModel: CompactionModel,
               executor, sqlContext, storeLocation
             )
@@ -269,7 +268,7 @@ object CarbonDataRDDFactory {
               val compactionType = 
CarbonCompactionUtil.determineCompactionType(metadataPath)
 
               val newCarbonLoadModel = new CarbonLoadModel()
-              DataManagementFunc.prepareCarbonLoadModel(storePath, table, 
newCarbonLoadModel)
+              DataManagementFunc.prepareCarbonLoadModel(table, 
newCarbonLoadModel)
 
               val compactionSize = CarbonDataMergerUtil
                   .getCompactionSize(CompactionType.MAJOR_COMPACTION)
@@ -282,7 +281,6 @@ object CarbonDataRDDFactory {
               // proceed for compaction
               try {
                 DataManagementFunc.executeCompaction(newCarbonLoadModel,
-                  newCarbonLoadModel.getStorePath,
                   newcompactionModel,
                   executor, sqlContext, storeLocation
                 )

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 8a39b0a..130f305 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -593,7 +593,7 @@ case class LoadTable(
           LOGGER.info(s"Overwrite is in progress for carbon table with 
$dbName.$tableName")
         }
         if (null == carbonLoadModel.getLoadMetadataDetails) {
-          CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+          CommonUtil.readLoadMetadataDetails(carbonLoadModel)
         }
         if (carbonLoadModel.getLoadMetadataDetails.isEmpty && 
carbonLoadModel.getUseOnePass &&
             StringUtils.isEmpty(columnDict) && 
StringUtils.isEmpty(allDictionaryPath)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index c7b72d5..0edfccf 100644
--- 
a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -76,7 +76,6 @@ object CarbonDataRDDFactory {
   def alterTableForCompaction(sqlContext: SQLContext,
       alterTableModel: AlterTableModel,
       carbonLoadModel: CarbonLoadModel,
-      storePath: String,
       storeLocation: String): Unit = {
     var compactionSize: Long = 0
     var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
@@ -104,7 +103,7 @@ object CarbonDataRDDFactory {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
     }
     // reading the start time of data load.
     val loadStartTime : Long =
@@ -135,7 +134,6 @@ object CarbonDataRDDFactory {
       LOGGER.info("System level compaction lock is enabled.")
       handleCompactionForSystemLocking(sqlContext,
         carbonLoadModel,
-        storePath,
         storeLocation,
         compactionType,
         carbonTable,
@@ -154,7 +152,6 @@ object CarbonDataRDDFactory {
         try {
           startCompactionThreads(sqlContext,
             carbonLoadModel,
-            storePath,
             storeLocation,
             compactionModel,
             lock
@@ -178,14 +175,12 @@ object CarbonDataRDDFactory {
   def alterTableSplitPartition(sqlContext: SQLContext,
       partitionId: String,
       carbonLoadModel: CarbonLoadModel,
-      storePath: String,
       oldPartitionIdList: List[Int]): Unit = {
     LOGGER.audit(s"Add partition request received for table " +
          s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
     try {
       startSplitThreads(sqlContext,
         carbonLoadModel,
-        storePath,
         partitionId,
         oldPartitionIdList)
     } catch {
@@ -195,9 +190,28 @@ object CarbonDataRDDFactory {
     }
   }
 
+  def alterTableDropPartition(sqlContext: SQLContext,
+      partitionId: String,
+      carbonLoadModel: CarbonLoadModel,
+      dropWithData: Boolean,
+      oldPartitionIds: List[Int]): Unit = {
+    LOGGER.audit(s"Drop partition request received for table " +
+                 s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
+    try {
+      startDropThreads(sqlContext,
+        carbonLoadModel,
+        partitionId,
+        dropWithData,
+        oldPartitionIds)
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception in start dropping partition thread. ${ 
e.getMessage }")
+        throw e
+    }
+  }
+
   def handleCompactionForSystemLocking(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      storePath: String,
       storeLocation: String,
       compactionType: CompactionType,
       carbonTable: CarbonTable,
@@ -212,7 +226,6 @@ object CarbonDataRDDFactory {
       try {
         startCompactionThreads(sqlContext,
           carbonLoadModel,
-          storePath,
           storeLocation,
           compactionModel,
           lock
@@ -248,7 +261,6 @@ object CarbonDataRDDFactory {
 
   def startCompactionThreads(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      storePath: String,
       storeLocation: String,
       compactionModel: CompactionModel,
       compactionLock: ICarbonLock): Unit = {
@@ -257,7 +269,7 @@ object CarbonDataRDDFactory {
     if (compactionModel.compactionType != 
CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
       // update the updated table status. For the case of Update Delta 
Compaction the Metadata
       // is filled in LoadModel, no need to refresh.
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
     }
 
     val compactionThread = new Thread {
@@ -269,7 +281,6 @@ object CarbonDataRDDFactory {
           var exception: Exception = null
           try {
             DataManagementFunc.executeCompaction(carbonLoadModel: 
CarbonLoadModel,
-              storePath: String,
               compactionModel: CompactionModel,
               executor, sqlContext, storeLocation
             )
@@ -301,7 +312,7 @@ object CarbonDataRDDFactory {
               val compactionType = 
CarbonCompactionUtil.determineCompactionType(metadataPath)
 
               val newCarbonLoadModel = new CarbonLoadModel()
-              DataManagementFunc.prepareCarbonLoadModel(storePath, table, 
newCarbonLoadModel)
+              DataManagementFunc.prepareCarbonLoadModel(table, 
newCarbonLoadModel)
 
               val compactionSize = CarbonDataMergerUtil
                   .getCompactionSize(CompactionType.MAJOR_COMPACTION)
@@ -314,7 +325,6 @@ object CarbonDataRDDFactory {
               // proceed for compaction
               try {
                 DataManagementFunc.executeCompaction(newCarbonLoadModel,
-                  newCarbonLoadModel.getStorePath,
                   newcompactionModel,
                   executor, sqlContext, storeLocation
                 )
@@ -365,7 +375,6 @@ object CarbonDataRDDFactory {
   case class SplitThread(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       executor: ExecutorService,
-      storePath: String,
       segmentId: String,
       partitionId: String,
       oldPartitionIdList: List[Int]) extends Thread {
@@ -374,8 +383,7 @@ object CarbonDataRDDFactory {
         var exception: Exception = null
         try {
           DataManagementFunc.executePartitionSplit(sqlContext,
-            carbonLoadModel, executor, storePath, segmentId, partitionId,
-            oldPartitionIdList)
+            carbonLoadModel, executor, segmentId, partitionId, 
oldPartitionIdList)
           triggeredSplitPartitionStatus = true
         } catch {
           case e: Exception =>
@@ -388,9 +396,26 @@ object CarbonDataRDDFactory {
       }
   }
 
+  case class dropPartitionThread(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      executor: ExecutorService,
+      segmentId: String,
+      partitionId: String,
+      dropWithData: Boolean,
+      oldPartitionIds: List[Int]) extends Thread {
+    override def run(): Unit = {
+      try {
+        DataManagementFunc.executeDroppingPartition(sqlContext, 
carbonLoadModel, executor,
+          segmentId, partitionId, dropWithData, oldPartitionIds)
+      } catch {
+        case e: Exception =>
+          LOGGER.error(s"Exception in dropping partition thread: ${ 
e.getMessage } }")
+      }
+    }
+  }
+
   def startSplitThreads(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      storePath: String,
       partitionId: String,
       oldPartitionIdList: List[Int]): Unit = {
     val numberOfCores = CarbonProperties.getInstance()
@@ -405,7 +430,7 @@ object CarbonDataRDDFactory {
       val threadArray: Array[SplitThread] = new 
Array[SplitThread](validSegments.size)
       var i = 0
       validSegments.foreach { segmentId =>
-        threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor, 
storePath,
+        threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor,
           segmentId, partitionId, oldPartitionIdList)
         threadArray(i).start()
         i += 1
@@ -429,6 +454,46 @@ object CarbonDataRDDFactory {
     }
   }
 
+  def startDropThreads(sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      partitionId: String,
+      dropWithData: Boolean,
+      oldPartitionIds: List[Int]): Unit = {
+    val numberOfCores = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
+      CarbonCommonConstants.DEFAULT_NUMBER_CORES)
+    val executor : ExecutorService = 
Executors.newFixedThreadPool(numberOfCores.toInt)
+    try {
+      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      val segmentStatusManager = new 
SegmentStatusManager(absoluteTableIdentifier)
+      val validSegments = 
segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
+      val threadArray: Array[Thread] = new Array[Thread](validSegments.size)
+      var i = 0
+      for (segmentId: String <- validSegments) {
+        threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, 
executor,
+            segmentId, partitionId, dropWithData, oldPartitionIds)
+        threadArray(i).start()
+        i += 1
+      }
+      for (thread <- threadArray) {
+        thread.join()
+      }
+    } catch {
+      case e: Exception =>
+        LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }")
+    } finally {
+      executor.shutdown()
+      try {
+        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
+      } catch {
+        case e: Exception =>
+          LOGGER.error(s"Exception in dropping partition thread while deleting 
partial load file" +
+                       s" ${ e.getMessage }")
+      }
+    }
+  }
+
   def loadCarbonData(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       storePath: String,
@@ -473,7 +538,6 @@ object CarbonDataRDDFactory {
 
           handleCompactionForSystemLocking(sqlContext,
             carbonLoadModel,
-            storePath,
             storeLocation,
             CompactionType.MINOR_COMPACTION,
             carbonTable,
@@ -490,7 +554,6 @@ object CarbonDataRDDFactory {
             try {
               startCompactionThreads(sqlContext,
                 carbonLoadModel,
-                storePath,
                 storeLocation,
                 compactionModel,
                 lock

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 3f0153e..7ed280e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -21,7 +21,7 @@ import java.text.SimpleDateFormat
 import java.util
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.language.implicitConversions
 
 import org.apache.commons.lang3.StringUtils
@@ -52,6 +52,7 @@ import 
org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, 
TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -177,7 +178,6 @@ case class AlterTableCompaction(alterTableModel: 
AlterTableModel) extends Runnab
           .alterTableForCompaction(sparkSession.sqlContext,
             alterTableModel,
             carbonLoadModel,
-            relation.tableMeta.storePath,
             storeLocation
           )
     } catch {
@@ -301,7 +301,6 @@ case class 
AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitP
       CarbonDataRDDFactory.alterTableSplitPartition(sparkSession.sqlContext,
         partitionId.toString,
         carbonLoadModel,
-        relation.tableMeta.storePath,
         oldPartitionIds.asScala.toList
       )
       success = true
@@ -313,6 +312,7 @@ case class 
AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitP
       AlterTableUtil.releaseLocks(locks)
       CacheProvider.getInstance().dropAllCache()
       LOGGER.info("Locks released after alter table add/split partition 
action.")
+      LOGGER.audit("Locks released after alter table add/split partition 
action.")
       if (success) {
         LOGGER.info(s"Alter table add/split partition is successful for table 
$dbName.$tableName")
         LOGGER.audit(s"Alter table add/split partition is successful for table 
$dbName.$tableName")
@@ -322,7 +322,142 @@ case class 
AlterTableSplitPartitionCommand(splitPartitionModel: AlterTableSplitP
   }
 }
 
-case class CreateTable(cm: TableModel, createDSTable: Boolean = true) extends 
RunnableCommand
+case class AlterTableDropPartition(alterTableDropPartitionModel: 
AlterTableDropPartitionModel)
+  extends RunnableCommand with DataProcessCommand with SchemaProcessCommand {
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+  val tableName = alterTableDropPartitionModel.tableName
+  var dbName: String = null
+  val partitionId = alterTableDropPartitionModel.partitionId
+  val dropWithData = alterTableDropPartitionModel.dropWithData
+  if (partitionId == 0 ) {
+    sys.error(s"Cannot drop default partition! Please use delete statement!")
+  }
+  var partitionInfo: PartitionInfo = null
+  var carbonMetaStore: CarbonMetaStore = null
+  var relation: CarbonRelation = null
+  var storePath: String = null
+  var table: CarbonTable = null
+  var carbonTableIdentifier: CarbonTableIdentifier = null
+  val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
+  val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+    LockUsage.COMPACTION_LOCK,
+    LockUsage.DELETE_SEGMENT_LOCK,
+    LockUsage.DROP_TABLE_LOCK,
+    LockUsage.CLEAN_FILES_LOCK,
+    LockUsage.ALTER_PARTITION_LOCK)
+
+  def run(sparkSession: SparkSession): Seq[Row] = {
+    processSchema(sparkSession)
+    processData(sparkSession)
+    Seq.empty
+  }
+
+  override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+    dbName = alterTableDropPartitionModel.databaseName
+      .getOrElse(sparkSession.catalog.currentDatabase)
+    carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    relation = carbonMetaStore.lookupRelation(Option(dbName), 
tableName)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+    carbonTableIdentifier = relation.tableMeta.carbonTableIdentifier
+    storePath = relation.tableMeta.storePath
+    carbonMetaStore.checkSchemasModifiedTimeAndReloadTables(storePath)
+    if (relation == null) {
+      sys.error(s"Table $dbName.$tableName does not exist")
+    }
+    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + 
tableName)) {
+      LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
+      sys.error(s"Alter table failed. table not found: $dbName.$tableName")
+    }
+    table = relation.tableMeta.carbonTable
+    partitionInfo = table.getPartitionInfo(tableName)
+    if (partitionInfo == null) {
+      sys.error(s"Table $tableName is not a partition table.")
+    }
+    val partitionIds = 
partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
+    // keep a copy of partitionIdList before update partitionInfo.
+    // will be used in partition data scan
+    oldPartitionIds.addAll(partitionIds.asJava)
+    val partitionIndex = partitionIds.indexOf(Integer.valueOf(partitionId))
+    partitionInfo.getPartitionType match {
+      case PartitionType.HASH => sys.error(s"Hash partition cannot be 
dropped!")
+      case PartitionType.RANGE =>
+        val rangeInfo = new util.ArrayList(partitionInfo.getRangeInfo)
+        val rangeToRemove = partitionInfo.getRangeInfo.get(partitionIndex - 1)
+        rangeInfo.remove(rangeToRemove)
+        partitionInfo.setRangeInfo(rangeInfo)
+      case PartitionType.LIST =>
+        val listInfo = new util.ArrayList(partitionInfo.getListInfo)
+        val listToRemove = partitionInfo.getListInfo.get(partitionIndex - 1)
+        listInfo.remove(listToRemove)
+        partitionInfo.setListInfo(listInfo)
+      case PartitionType.RANGE_INTERVAL =>
+        sys.error(s"Dropping range interval partition isn't support yet!")
+    }
+    partitionInfo.dropPartition(partitionIndex)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, 
carbonTableIdentifier)
+    val schemaFilePath = carbonTablePath.getSchemaFilePath
+    // read TableInfo
+    val tableInfo = 
carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
+
+    val schemaConverter = new ThriftWrapperSchemaConverterImpl()
+    val wrapperTableInfo = 
schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
+      dbName, tableName, storePath)
+    val tableSchema = wrapperTableInfo.getFactTable
+    tableSchema.setPartitionInfo(partitionInfo)
+    wrapperTableInfo.setFactTable(tableSchema)
+    wrapperTableInfo.setLastUpdatedTime(System.currentTimeMillis())
+    val thriftTable =
+      schemaConverter.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, 
tableName)
+    
thriftTable.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
+      .setTime_stamp(System.currentTimeMillis)
+    carbonMetaStore.updateMetadataByThriftTable(schemaFilePath, thriftTable,
+      dbName, tableName, storePath)
+    CarbonUtil.writeThriftTableToSchemaFile(schemaFilePath, thriftTable)
+    // update the schema modified time
+    carbonMetaStore.updateAndTouchSchemasUpdatedTime(storePath)
+    // sparkSession.catalog.refreshTable(tableName)
+    Seq.empty
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    var locks = List.empty[ICarbonLock]
+    var success = false
+    try {
+      locks = AlterTableUtil.validateTableAndAcquireLock(dbName, tableName,
+        locksToBeAcquired)(sparkSession)
+      val carbonLoadModel = new CarbonLoadModel()
+      val dataLoadSchema = new CarbonDataLoadSchema(table)
+      // Need to fill dimension relation
+      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+      carbonLoadModel.setTableName(carbonTableIdentifier.getTableName)
+      carbonLoadModel.setDatabaseName(carbonTableIdentifier.getDatabaseName)
+      carbonLoadModel.setStorePath(storePath)
+      val loadStartTime = CarbonUpdateUtil.readCurrentTime
+      carbonLoadModel.setFactTimeStamp(loadStartTime)
+      CarbonDataRDDFactory.alterTableDropPartition(sparkSession.sqlContext,
+        partitionId,
+        carbonLoadModel,
+        dropWithData,
+        oldPartitionIds.asScala.toList
+      )
+      success = true
+    } catch {
+      case e: Exception =>
+        sys.error(s"Drop Partition failed. Please check logs for more info. ${ 
e.getMessage } ")
+      success = false
+    } finally {
+      CacheProvider.getInstance().dropAllCache()
+      AlterTableUtil.releaseLocks(locks)
+      LOGGER.info("Locks released after alter table drop partition action.")
+      LOGGER.audit("Locks released after alter table drop partition action.")
+    }
+    LOGGER.info(s"Alter table drop partition is successful for table 
$dbName.$tableName")
+    LOGGER.audit(s"Alter table drop partition is successful for table 
$dbName.$tableName")
+    Seq.empty
+  }
+}
+
+  case class CreateTable(cm: TableModel, createDSTable: Boolean = true) 
extends RunnableCommand
     with SchemaProcessCommand {
 
   def run(sparkSession: SparkSession): Seq[Row] = {
@@ -796,7 +931,7 @@ case class LoadTable(
           LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is 
in progress")
         }
         if (null == carbonLoadModel.getLoadMetadataDetails) {
-          CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
+          CommonUtil.readLoadMetadataDetails(carbonLoadModel)
         }
         if (carbonLoadModel.getLoadMetadataDetails.isEmpty && 
carbonLoadModel.getUseOnePass &&
             StringUtils.isEmpty(column_dict) && 
StringUtils.isEmpty(all_dictionary_path)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cb51b862/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 1d74bee..24b2981 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -72,7 +72,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     alterTableModifyDataType | alterTableDropColumn | alterTableAddColumns
 
   protected lazy val alterPartition: Parser[LogicalPlan] =
-    alterAddPartition | alterSplitPartition
+    alterAddPartition | alterSplitPartition | alterDropPartition
 
   protected lazy val alterAddPartition: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> PARTITION ~>
@@ -95,6 +95,20 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         AlterTableSplitPartitionCommand(alterTableSplitPartitionModel)
     }
 
+  protected lazy val alterDropPartition: Parser[LogicalPlan] =
+    ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (DROP ~> PARTITION ~>
+      "(" ~> numericLit <~ ")") ~ (WITH ~> DATA).? <~ opt(";") ^^ {
+      case dbName ~ table ~ partitionId ~ withData =>
+        val dropWithData = withData.getOrElse("NO") match {
+          case "NO" => false
+          case _ => true
+        }
+        val alterTableDropPartitionModel =
+          AlterTableDropPartitionModel(dbName, table, partitionId, 
dropWithData)
+        AlterTableDropPartition(alterTableDropPartitionModel)
+    }
+
+
   protected lazy val alterTable: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) <~ 
opt(";")  ^^ {
       case dbName ~ table ~ (compact ~ compactType) =>

Reply via email to