This is an automated email from the ASF dual-hosted git repository.

jackylk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 862b97f  [CARBONDATA-3676] Support clean carbon data files of stages.
862b97f is described below

commit 862b97f08a0a7ba71488e3a2602ffc3af7ffa050
Author: liuzhi <371684...@qq.com>
AuthorDate: Tue Feb 4 12:13:38 2020 +0800

    [CARBONDATA-3676] Support clean carbon data files of stages.
    
    Why is this PR needed?
    
    At the end of the CarbonInsertFromStageCommand, the stage files will be 
cleared, but the data files which referenced by stage files will be not 
cleared. This could lead to a large backlog of data files。
    
    What changes were proposed in this PR?
    
    Provide a new command to allows us to delete data files which referenced by 
disabled table stages.
    The new command is CarbonDeleteStageCommand.
    
    Does this PR introduce any user interface change?
    Yes
    
    Is any new testcase added?
    Yes
    
    This closes #3602
---
 .../carbondata/core/util/path/CarbonTablePath.java |   5 +
 docs/dml-of-carbondata.md                          |  33 +++-
 .../apache/carbon/flink/CarbonLocalProperty.java   |   2 -
 .../org/apache/carbon/flink/CarbonLocalWriter.java |  11 +-
 .../org/apache/carbon/flink/CarbonS3Property.java  |   2 -
 .../org/apache/carbon/flink/CarbonS3Writer.java    |  16 +-
 .../carbon/flink/TestCarbonPartitionWriter.scala   |  24 +--
 .../org/apache/carbon/flink/TestCarbonWriter.scala |  15 +-
 ...rbonWriter.scala => TestDeleteStageFiles.scala} | 119 +++++++++++---
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |   2 +-
 .../management/CarbonDeleteStageFilesCommand.scala | 182 +++++++++++++++++++++
 .../parser/CarbonExtensionSpark2SqlParser.scala    |   4 +-
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |  29 +++-
 13 files changed, 341 insertions(+), 103 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 50bbe1d..cb34c59 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -56,6 +56,7 @@ public class CarbonTablePath {
   private static final String STREAMING_LOG_DIR = "log";
   private static final String STREAMING_CHECKPOINT_DIR = "checkpoint";
   private static final String STAGE_DIR = "stage";
+  private static final String STAGE_DATA_DIR = "stage_data";
   public static final String  SUCCESS_FILE_SUBFIX = ".success";
   private static final String SNAPSHOT_FILE_NAME = "snapshot";
 
@@ -69,6 +70,10 @@ public class CarbonTablePath {
     return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + 
STAGE_DIR;
   }
 
+  public static String getStageDataDir(String tablePath) {
+    return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + 
STAGE_DATA_DIR;
+  }
+
   public static String getStageSnapshotFile(String tablePath) {
     return CarbonTablePath.getStageDir(tablePath) + 
CarbonCommonConstants.FILE_SEPARATOR +
         SNAPSHOT_FILE_NAME;
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index 1cda75a..9d935c8 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -331,7 +331,7 @@ CarbonData DML statements are documented here,which 
includes:
     The number of stage files per processing.
 
     ``` 
-    OPTIONS('batch_file_count'=',')
+    OPTIONS('batch_file_count'='5')
     ```
 
   Examples:
@@ -446,6 +446,37 @@ CarbonData DML statements are documented here,which 
includes:
   ```
   DELETE FROM carbontable WHERE column1 IN (SELECT column11 FROM sourceTable2 
WHERE column1 = 'USA')
   ```
+    
+### DELETE STAGE
+
+  This command allows us to delete the data files (stage data) which is 
already loaded into the table.
+  ```
+  DELETE FROM TABLE [db_name.]table_name STAGE 
OPTIONS(property_name=property_value, ...)
+  ```  
+  **Supported Properties:**
+
+| Property                                                | Description        
                                         |
+| ------------------------------------------------------- | 
----------------------------------------------------------- |
+| [retain_hour](#retain_hour)                             | Data file retain 
time in hours                              |
+
+-
+  You can use the following options to delete data:
+  - ##### retain_hour: 
+    Data file retain time in second, the command just delete overdue files 
only.
+
+    ``` 
+    OPTIONS('retain_hour'='1')
+    ```
+
+  Examples:
+
+  ```
+  DELETE FROM TABLE carbontable STAGE
+  ```
+
+  ```
+  DELETE FROM TABLE carbontable STAGE OPTIONS ('retain_hour'='1')
+  ```
 
 ## COMPACTION
 
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
index c1be532..e87fc67 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
@@ -21,8 +21,6 @@ public final class CarbonLocalProperty {
 
   public static final String DATA_TEMP_PATH = 
"carbon.writer.local.data.temp.path";
 
-  public static final String DATA_PATH = "carbon.writer.local.data.path";
-
   static final String COMMIT_THRESHOLD = 
"carbon.writer.local.commit.threshold";
 
   private CarbonLocalProperty() {
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
index c80e9b3..10a5fad 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
@@ -150,16 +150,7 @@ final class CarbonLocalWriter extends CarbonWriter {
       LOGGER.debug("Commit write. " + this.toString());
     }
     try {
-      final Properties writerProperties =
-          this.getFactory().getConfiguration().getWriterProperties();
-      String dataPath = 
writerProperties.getProperty(CarbonLocalProperty.DATA_PATH);
-      if (dataPath == null) {
-        throw new IllegalArgumentException(
-                "Writer property [" + CarbonLocalProperty.DATA_PATH + "] is 
not set."
-        );
-      }
-      dataPath = dataPath + this.table.getDatabaseName() + 
CarbonCommonConstants.FILE_SEPARATOR
-          + this.table.getTableName() + CarbonCommonConstants.FILE_SEPARATOR;
+      String dataPath = 
CarbonTablePath.getStageDataDir(this.table.getTablePath());
       tryCreateLocalDirectory(new File(dataPath));
       StageInput stageInput = this.uploadSegmentDataFiles(this.writePath, 
dataPath);
       if (stageInput == null) {
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java
index 66fa12b..cb7a4a8 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java
@@ -27,8 +27,6 @@ final class CarbonS3Property {
 
   static final String DATA_TEMP_PATH = "carbon.writer.s3.data.temp.path";
 
-  static final String DATA_PATH = "carbon.writer.s3.data.path";
-
   static final String COMMIT_THRESHOLD = "carbon.writer.s3.commit.threshold";
 
   private CarbonS3Property() {
diff --git 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
index 72e4405..30dad93 100644
--- 
a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
+++ 
b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
@@ -160,21 +160,7 @@ final class CarbonS3Writer extends CarbonWriter {
     ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo()
         .getNonSerializableExtraInfo().put("carbonConf", this.configuration);
     try {
-      final Properties writerProperties =
-          this.getFactory().getConfiguration().getWriterProperties();
-      String dataPath = 
writerProperties.getProperty(CarbonS3Property.DATA_PATH);
-      if (dataPath == null) {
-        throw new IllegalArgumentException(
-                "Writer property [" + CarbonS3Property.DATA_PATH + "] is not 
set."
-        );
-      }
-      if (!dataPath.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
-        throw new IllegalArgumentException(
-                "Writer property [" + CarbonS3Property.DATA_PATH + "] is not a 
s3a path."
-        );
-      }
-      dataPath = dataPath + this.table.getDatabaseName() + 
CarbonCommonConstants.FILE_SEPARATOR +
-          this.table.getTableName() + CarbonCommonConstants.FILE_SEPARATOR;
+      String dataPath = 
CarbonTablePath.getStageDataDir(this.table.getTablePath());
       StageInput stageInput = this.uploadSegmentDataFiles(this.writePath, 
dataPath);
       if (stageInput == null) {
         return;
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
index c92d6fc..6ca877c 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonPartitionWriter.scala
@@ -56,14 +56,11 @@ class TestCarbonPartitionWriter extends QueryTest {
     val rootPath = System.getProperty("user.dir") + "/target/test-classes"
 
     val dataTempPath = rootPath + "/data/temp/"
-    val dataPath = rootPath + "/data/"
-    delDir(new File(dataPath))
-    new File(dataPath).mkdir()
 
     try {
       val tablePath = storeLocation + "/" + tableName + "/"
 
-      val writerProperties = newWriterProperties(dataTempPath, dataPath, 
storeLocation)
+      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
       val carbonProperties = newCarbonProperties(storeLocation)
 
       val environment = StreamExecutionEnvironment.getExecutionEnvironment
@@ -110,11 +107,6 @@ class TestCarbonPartitionWriter extends QueryTest {
           // TODO
           throw new UnsupportedOperationException(exception)
       }
-
-      val dataLocation = dataPath + "default" + 
CarbonCommonConstants.FILE_SEPARATOR +
-        tableName + CarbonCommonConstants.FILE_SEPARATOR
-
-      assertResult(true)(FileFactory.isFileExist(dataLocation))
       assertResult(false)(FileFactory
         
.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
 
@@ -124,7 +116,6 @@ class TestCarbonPartitionWriter extends QueryTest {
 
     } finally {
       sql(s"DROP TABLE IF EXISTS $tableName").collect()
-      delDir(new File(dataPath))
     }
   }
 
@@ -143,14 +134,11 @@ class TestCarbonPartitionWriter extends QueryTest {
     val rootPath = System.getProperty("user.dir") + "/target/test-classes"
 
     val dataTempPath = rootPath + "/data/temp/"
-    val dataPath = rootPath + "/data/"
-    delDir(new File(dataPath))
-    new File(dataPath).mkdir()
 
     try {
       val tablePath = storeLocation + "/" + tableName + "/"
 
-      val writerProperties = newWriterProperties(dataTempPath, dataPath, 
storeLocation)
+      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
       val carbonProperties = newCarbonProperties(storeLocation)
 
       val environment = StreamExecutionEnvironment.getExecutionEnvironment
@@ -199,11 +187,6 @@ class TestCarbonPartitionWriter extends QueryTest {
           // TODO
           throw new UnsupportedOperationException(exception)
       }
-
-      val dataLocation = dataPath + "default" + 
CarbonCommonConstants.FILE_SEPARATOR +
-                         tableName + CarbonCommonConstants.FILE_SEPARATOR
-
-      assertResult(true)(FileFactory.isFileExist(dataLocation))
       assertResult(false)(FileFactory
         
.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
 
@@ -217,17 +200,14 @@ class TestCarbonPartitionWriter extends QueryTest {
 
     } finally {
       sql(s"DROP TABLE IF EXISTS $tableName").collect()
-      delDir(new File(dataPath))
     }
   }
 
   private def newWriterProperties(
      dataTempPath: String,
-     dataPath: String,
      storeLocation: String) = {
     val properties = new Properties
     properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
-    properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
     properties
   }
 
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
index a297dcf..72625dc 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -17,7 +17,6 @@
 
 package org.apache.carbon.flink
 
-import java.io.File
 import java.util.Properties
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -47,14 +46,11 @@ class TestCarbonWriter extends QueryTest {
     val rootPath = System.getProperty("user.dir") + "/target/test-classes"
 
     val dataTempPath = rootPath + "/data/temp/"
-    val dataPath = rootPath + "/data/"
-    new File(dataPath).delete()
-    new File(dataPath).mkdir()
 
     try {
       val tablePath = storeLocation + "/" + tableName + "/"
 
-      val writerProperties = newWriterProperties(dataTempPath, dataPath, 
storeLocation)
+      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
       val carbonProperties = newCarbonProperties(storeLocation)
 
       val environment = StreamExecutionEnvironment.getExecutionEnvironment
@@ -109,7 +105,6 @@ class TestCarbonWriter extends QueryTest {
 
     } finally {
       sql(s"DROP TABLE IF EXISTS $tableName").collect()
-      new File(dataPath).delete()
     }
   }
 
@@ -125,14 +120,11 @@ class TestCarbonWriter extends QueryTest {
     val rootPath = System.getProperty("user.dir") + "/target/test-classes"
 
     val dataTempPath = rootPath + "/data/temp/"
-    val dataPath = rootPath + "/data/"
-    new File(dataPath).delete()
-    new File(dataPath).mkdir()
 
     try {
       val tablePath = storeLocation + "/" + tableName + "/"
 
-      val writerProperties = newWriterProperties(dataTempPath, dataPath, 
storeLocation)
+      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
       val carbonProperties = newCarbonProperties(storeLocation)
 
       writerProperties.put(CarbonLocalProperty.COMMIT_THRESHOLD, "100")
@@ -186,17 +178,14 @@ class TestCarbonWriter extends QueryTest {
       checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
     } finally {
       sql(s"DROP TABLE IF EXISTS $tableName").collect()
-      new File(dataPath).delete()
     }
   }
 
   private def newWriterProperties(
     dataTempPath: String,
-    dataPath: String,
     storeLocation: String) = {
     val properties = new Properties
     properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
-    properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
     properties
   }
 
diff --git 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestDeleteStageFiles.scala
similarity index 64%
copy from 
integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
copy to 
integration/flink/src/test/scala/org/apache/carbon/flink/TestDeleteStageFiles.scala
index a297dcf..a84ede4 100644
--- 
a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
+++ 
b/integration/flink/src/test/scala/org/apache/carbon/flink/TestDeleteStageFiles.scala
@@ -20,7 +20,6 @@ package org.apache.carbon.flink
 import java.io.File
 import java.util.Properties
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.flink.api.common.restartstrategy.RestartStrategies
 import org.apache.flink.core.fs.Path
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
@@ -28,14 +27,15 @@ import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
-class TestCarbonWriter extends QueryTest {
+class TestDeleteStageFiles extends QueryTest {
 
-  val tableName = "test_flink"
+  val tableName = "test_flink_delete_stage_file"
 
-  test("Writing flink data to local carbon table") {
+  test("Delete stage file success") {
     sql(s"DROP TABLE IF EXISTS $tableName").collect()
     sql(
       s"""
@@ -47,14 +47,11 @@ class TestCarbonWriter extends QueryTest {
     val rootPath = System.getProperty("user.dir") + "/target/test-classes"
 
     val dataTempPath = rootPath + "/data/temp/"
-    val dataPath = rootPath + "/data/"
-    new File(dataPath).delete()
-    new File(dataPath).mkdir()
 
     try {
       val tablePath = storeLocation + "/" + tableName + "/"
 
-      val writerProperties = newWriterProperties(dataTempPath, dataPath, 
storeLocation)
+      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
       val carbonProperties = newCarbonProperties(storeLocation)
 
       val environment = StreamExecutionEnvironment.getExecutionEnvironment
@@ -107,44 +104,121 @@ class TestCarbonWriter extends QueryTest {
       
assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
       
assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
 
+      sql(s"DELETE FROM TABLE $tableName STAGE OPTIONS('retain_hour'='0')")
+      val dataLocation = new File(CarbonTablePath.getStageDataDir(tablePath))
+      assertResult(true)(dataLocation.listFiles() == null || 
dataLocation.listFiles().length == 0)
     } finally {
       sql(s"DROP TABLE IF EXISTS $tableName").collect()
-      new File(dataPath).delete()
     }
   }
 
-  test("test batch_file_count option") {
+  test("Delete stage file success for partition table") {
     sql(s"DROP TABLE IF EXISTS $tableName").collect()
     sql(
       s"""
-         | CREATE TABLE $tableName (stringField string, intField int, 
shortField short)
+         | CREATE TABLE $tableName (intField int, shortField short)
          | STORED AS carbondata
+         | PARTITIONED BY (stringField string)
       """.stripMargin
     ).collect()
 
     val rootPath = System.getProperty("user.dir") + "/target/test-classes"
 
     val dataTempPath = rootPath + "/data/temp/"
-    val dataPath = rootPath + "/data/"
-    new File(dataPath).delete()
-    new File(dataPath).mkdir()
 
     try {
       val tablePath = storeLocation + "/" + tableName + "/"
 
-      val writerProperties = newWriterProperties(dataTempPath, dataPath, 
storeLocation)
+      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
       val carbonProperties = newCarbonProperties(storeLocation)
 
-      writerProperties.put(CarbonLocalProperty.COMMIT_THRESHOLD, "100")
+      val environment = StreamExecutionEnvironment.getExecutionEnvironment
+      environment.setParallelism(1)
+      environment.enableCheckpointing(2000L)
+      environment.setRestartStrategy(RestartStrategies.noRestart)
+
+      val dataCount = 1000
+      val source = new TestSource(dataCount) {
+        @throws[InterruptedException]
+        override def get(index: Int): Array[AnyRef] = {
+          Thread.sleep(1L)
+          val data = new Array[AnyRef](3)
+          data(0) = index.asInstanceOf[AnyRef]
+          data(1) = 12345.asInstanceOf[AnyRef]
+          data(2) = "test" + (index % 10)
+          data
+        }
+
+        @throws[InterruptedException]
+        override def onFinish(): Unit = {
+          Thread.sleep(5000L)
+        }
+      }
+      val stream = environment.addSource(source)
+      val factory = CarbonWriterFactory.builder("Local").build(
+        "default",
+        tableName,
+        tablePath,
+        new Properties,
+        writerProperties,
+        carbonProperties
+      )
+      val streamSink = StreamingFileSink.forBulkFormat(new 
Path(ProxyFileSystem.DEFAULT_URI), factory).build
+
+      stream.addSink(streamSink)
+
+      try environment.execute
+      catch {
+        case exception: Exception =>
+          // TODO
+          throw new UnsupportedOperationException(exception)
+      }
+
+      sql(s"INSERT INTO $tableName STAGE")
+
+      checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
+
+      // ensure the stage snapshot file and all stage files are deleted
+      
assertResult(false)(FileFactory.isFileExist(CarbonTablePath.getStageSnapshotFile(tablePath)))
+      
assertResult(true)(FileFactory.getCarbonFile(CarbonTablePath.getStageDir(tablePath)).listFiles().isEmpty)
+
+      sql(s"DELETE FROM TABLE $tableName STAGE OPTIONS('retain_hour'='0')")
+      val dataLocation = new File(CarbonTablePath.getStageDataDir(tablePath))
+      assertResult(true)(dataLocation.listFiles() == null || 
dataLocation.listFiles().length == 0)
+    } finally {
+      sql(s"DROP TABLE IF EXISTS $tableName").collect()
+    }
+  }
+
+  test("All data files are referenced by stage, not delete any data files.") {
+    sql(s"DROP TABLE IF EXISTS $tableName").collect()
+    sql(
+      s"""
+         | CREATE TABLE $tableName (stringField string, intField int, 
shortField short)
+         | STORED AS carbondata
+      """.stripMargin
+    ).collect()
+
+    val rootPath = System.getProperty("user.dir") + "/target/test-classes"
+
+    val dataTempPath = rootPath + "/data/temp/"
+
+    try {
+      val tablePath = storeLocation + "/" + tableName + "/"
+
+      val writerProperties = newWriterProperties(dataTempPath, storeLocation)
+      val carbonProperties = newCarbonProperties(storeLocation)
 
       val environment = StreamExecutionEnvironment.getExecutionEnvironment
       environment.setParallelism(1)
+      environment.enableCheckpointing(2000L)
       environment.setRestartStrategy(RestartStrategies.noRestart)
 
       val dataCount = 1000
       val source = new TestSource(dataCount) {
         @throws[InterruptedException]
         override def get(index: Int): Array[AnyRef] = {
+          Thread.sleep(1L)
           val data = new Array[AnyRef](3)
           data(0) = "test" + index
           data(1) = index.asInstanceOf[AnyRef]
@@ -177,26 +251,19 @@ class TestCarbonWriter extends QueryTest {
           throw new UnsupportedOperationException(exception)
       }
 
-      sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
-
-      checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(500)))
-
-      sql(s"INSERT INTO $tableName STAGE OPTIONS ('batch_file_count' = '5')")
-
-      checkAnswer(sql(s"SELECT count(1) FROM $tableName"), Seq(Row(1000)))
+      sql(s"DELETE FROM TABLE $tableName STAGE")
+      val dataLocation = new File(CarbonTablePath.getStageDataDir(tablePath))
+      assertResult(true)(dataLocation.listFiles().length > 0)
     } finally {
       sql(s"DROP TABLE IF EXISTS $tableName").collect()
-      new File(dataPath).delete()
     }
   }
 
   private def newWriterProperties(
     dataTempPath: String,
-    dataPath: String,
     storeLocation: String) = {
     val properties = new Properties
     properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
-    properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
     properties
   }
 
diff --git 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index cfc5425..66f3ed3 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -218,7 +218,7 @@ abstract class CarbonDDLSqlParser extends 
AbstractCarbonSparkSQLParser {
         }
     }
 
-  protected lazy val loadOptions: Parser[(String, String)] =
+  protected lazy val options: Parser[(String, String)] =
     (stringLit <~ "=") ~ stringLit ^^ {
       case opt ~ optvalue => (opt.trim.toLowerCase(), optvalue)
       case _ => ("", "")
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageFilesCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageFilesCommand.scala
new file mode 100644
index 0000000..7930897
--- /dev/null
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteStageFilesCommand.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.management
+
+import java.io.InputStreamReader
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+import com.google.gson.Gson
+import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.statusmanager.StageInput
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Delete carbon data files of table stages.
+ *
+ * @param databaseNameOp database name
+ * @param tableName      table name
+ */
+case class CarbonDeleteStageFilesCommand(
+    databaseNameOp: Option[String],
+    tableName: String,
+    options: Map[String, String]
+) extends DataCommand {
+
+  @transient val LOGGER: Logger = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  override def processData(spark: SparkSession): Seq[Row] = {
+    Checker.validateTableExists(databaseNameOp, tableName, spark)
+    val table = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(spark)
+    val configuration = spark.sessionState.newHadoopConf()
+    setAuditTable(table)
+    if (!table.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non 
transactional table")
+    }
+    if (table.isChildTableForMV) {
+      throw new MalformedCarbonCommandException("Unsupported operation on MV 
table")
+    }
+    val tablePath = table.getTablePath
+    val startTime = System.currentTimeMillis()
+    val stageDataFileActiveTime = try {
+      Integer.valueOf(options.getOrElse("retain_hour", "0")) * 3600000
+    } catch {
+      case _: NumberFormatException =>
+        throw new MalformedCarbonCommandException(
+          "Option [retain_hour] is not a number.")
+    }
+    if (stageDataFileActiveTime < 0) {
+      throw new MalformedCarbonCommandException(
+        "Option [retain_hour] is negative.")
+    }
+    val stageDataFilesReferenced =
+      listStageDataFilesReferenced(listStageMetadataFiles(tablePath, 
configuration), configuration)
+    val stageDataFiles = listStageDataFiles(tablePath, configuration)
+    stageDataFiles.collect {
+      case stageDataFile: CarbonFile =>
+        // Which file will be deleted:
+        // 1. Not referenced by any stage file;
+        // 2. Has passed retain time.
+        if (!stageDataFilesReferenced.contains(stageDataFile.getCanonicalPath) 
&&
+            (startTime - stageDataFile.getLastModifiedTime) >= 
stageDataFileActiveTime) {
+          stageDataFile.delete()
+        }
+    }
+    Seq.empty
+  }
+
+  private def listStageMetadataFiles(
+      tablePath: String,
+      configuration: Configuration
+  ): Seq[CarbonFile] = {
+    val stagePath = CarbonTablePath.getStageDir(tablePath)
+    val stageDirectory = FileFactory.getCarbonFile(stagePath, configuration)
+    if (stageDirectory.exists()) {
+      stageDirectory.listFiles().filter { file =>
+        !file.getName.endsWith(CarbonTablePath.SUCCESS_FILE_SUBFIX)
+      }
+    } else {
+      Seq.empty
+    }
+  }
+
+  private def listStageDataFiles(
+      tablePath: String,
+      configuration: Configuration
+  ): Seq[CarbonFile] = {
+    val stageDataFileLocation = FileFactory.getCarbonFile(
+      CarbonTablePath.getStageDataDir(tablePath),
+      configuration
+    )
+    if (!stageDataFileLocation.exists()) {
+      LOGGER.warn(
+        "Stage data file location is not exists. " + 
CarbonTablePath.getStageDataDir(tablePath)
+      )
+      Seq.empty
+    } else {
+      stageDataFileLocation.listFiles(true).asScala
+    }
+  }
+
+  /**
+   * Collect data file path list which referenced by stage (which is not 
loaded into the table).
+   */
+  private def listStageDataFilesReferenced(
+      stageFiles: Seq[CarbonFile],
+      configuration: Configuration
+  ): Set[String] = {
+    if (stageFiles.isEmpty) {
+      return Set.empty
+    }
+    // Collect stage data files.
+    val stageDataFilesReferenced = Collections.synchronizedSet(new 
util.HashSet[String]())
+    val startTime = System.currentTimeMillis()
+    stageFiles.foreach { stageFile =>
+      val stream = FileFactory.getDataInputStream(stageFile.getAbsolutePath, 
configuration)
+      try {
+        val stageInput =
+          new Gson().fromJson(new InputStreamReader(stream), 
classOf[StageInput])
+        val stageDataBase = stageInput.getBase + 
CarbonCommonConstants.FILE_SEPARATOR
+        if (stageInput.getFiles != null) {
+          // For non-partition table.
+          stageInput.getFiles.asScala.foreach(
+            stageDataFile =>
+              stageDataFilesReferenced.add(
+                FileFactory.getCarbonFile(
+                  stageDataBase + stageDataFile._1,
+                  configuration
+                ).getCanonicalPath
+              )
+          )
+        }
+        if (stageInput.getLocations != null) {
+          // For partition table.
+          stageInput.getLocations.asScala.foreach(
+            stageDataLocation =>
+              stageDataLocation.getFiles.asScala.foreach(
+                stageDataFile =>
+                  stageDataFilesReferenced.add(
+                    FileFactory.getCarbonFile(
+                      stageDataBase + stageDataFile._1,
+                      configuration
+                    ).getCanonicalPath
+                  )
+              )
+          )
+        }
+      } finally {
+        stream.close()
+      }
+    }
+    LOGGER.info(s"Read stage files taken ${ System.currentTimeMillis() - 
startTime }ms.")
+    stageDataFilesReferenced.asScala.toSet
+  }
+
+  override protected def opName: String = "DELETE STAGE"
+}
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSpark2SqlParser.scala
index 51b714a..6632762 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonExtensionSpark2SqlParser.scala
@@ -53,7 +53,7 @@ class CarbonExtensionSpark2SqlParser extends 
CarbonSpark2SqlParser {
   override protected lazy val alterTableAddColumns: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~
     (ADD ~> COLUMNS ~> "(" ~> repsep(anyFieldDef, ",") <~ ")") ~
-    (TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")") <~ opt(";") ^^ {
+    (TBLPROPERTIES ~> "(" ~> repsep(options, ",") <~ ")") <~ opt(";") ^^ {
       case dbName ~ table ~ fields ~ tblProp =>
         CarbonSparkSqlParserUtil.alterTableAddColumns(
           dbName, table, fields, Option(tblProp))
@@ -66,7 +66,7 @@ class CarbonExtensionSpark2SqlParser extends 
CarbonSpark2SqlParser {
     LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
     (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
     (PARTITION ~> "(" ~> repsep(partitions, ",") <~ ")").? ~
-    (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")") <~ opt(";") ^^ {
+    (OPTIONS ~> "(" ~> repsep(options, ",") <~ ")") <~ opt(";") ^^ {
       case filePath ~ isOverwrite ~ table ~ partitions ~ optionsList =>
         val (databaseNameOp, tableName) = table match {
           case databaseName ~ tableName => (databaseName, 
tableName.toLowerCase())
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 6b69ec3..99434bf 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -80,7 +80,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     cacheManagement | alterDataMap | insertStageData
 
   protected lazy val loadManagement: Parser[LogicalPlan] =
-    deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | addLoad
+    deleteLoadsByID | deleteLoadsByLoadDate | deleteStage | cleanFiles | 
addLoad
 
   protected lazy val restructure: Parser[LogicalPlan] = alterTableDropColumn
 
@@ -125,9 +125,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
    * AS SELECT COUNT(COL1) FROM tableName
    */
   protected lazy val createStream: Parser[LogicalPlan] =
-    CREATE ~> STREAM ~>  opt(IF ~> NOT ~> EXISTS) ~ ident ~
+    CREATE ~> STREAM ~> opt(IF ~> NOT ~> EXISTS) ~ ident ~
     (ON ~> TABLE ~> (ident <~ ".").?) ~ ident ~
-    (STMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
+    (STMPROPERTIES ~> "(" ~> repsep(options, ",") <~ ")").? ~
     (AS ~> restInput) <~ opt(";") ^^ {
       case ifNotExists ~ streamName ~ dbName ~ tableName ~ options ~ query =>
         val optionMap = options.getOrElse(List[(String, 
String)]()).toMap[String, String]
@@ -167,7 +167,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     opt(ontable) ~
     (USING ~> stringLit) ~
     opt(WITH ~> DEFERRED ~> REBUILD) ~
-    (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
+    (DMPROPERTIES ~> "(" ~> repsep(options, ",") <~ ")").? ~
     (AS ~> restInput).? <~ opt(";") ^^ {
       case ifnotexists ~ dmname ~ tableIdent ~ dmProviderName ~ deferred ~ 
dmprops ~ query =>
         val map = dmprops.getOrElse(List[(String, String)]()).toMap[String, 
String]
@@ -454,7 +454,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
     (INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
     (PARTITION ~> "(" ~> repsep(partitions, ",") <~ ")").? ~
-    (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
+    (OPTIONS ~> "(" ~> repsep(options, ",") <~ ")").? <~ opt(";") ^^ {
       case filePath ~ isOverwrite ~ table ~ partitions ~ optionsList =>
         val (databaseNameOp, tableName) = table match {
           case databaseName ~ tableName => (databaseName, 
tableName.toLowerCase())
@@ -487,6 +487,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     }
 
   /**
+   * DELETE FROM TABLE [dbName.]tableName STAGE OPTIONS (key1=value1, 
key2=value2, ...)
+   */
+  protected lazy val deleteStage: Parser[LogicalPlan] =
+    DELETE ~> FROM ~> TABLE ~> (ident <~ ".").? ~ ident ~
+    STAGE ~ (OPTIONS ~> "(" ~> repsep(options, ",") <~ ")").? <~ opt(";") ^^ {
+      case database ~ table ~ _ ~ options =>
+            CarbonDeleteStageFilesCommand(database, table,
+              options.getOrElse(List[(String, String)]()).toMap[String, 
String])
+    }
+
+  /**
    * ALTER TABLE [dbName.]tableName ADD SEGMENT
    * OPTIONS('path'='path','format'='format', ['partition'='schema list'])
    *
@@ -495,7 +506,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
    */
   protected lazy val addLoad: Parser[LogicalPlan] =
     ALTER ~ TABLE ~> (ident <~ ".").? ~ ident ~ (ADD ~> SEGMENT) ~
-    (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")") <~ opt(";") ^^ {
+    (OPTIONS ~> "(" ~> repsep(options, ",") <~ ")") <~ opt(";") ^^ {
       case dbName ~ tableName ~ segment ~ optionsList =>
         CarbonAddLoadCommand(dbName, tableName, optionsList.toMap)
     }
@@ -505,8 +516,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
    */
   protected lazy val insertStageData: Parser[LogicalPlan] =
     INSERT ~ INTO ~> (ident <~ ".").? ~ ident ~ STAGE ~
-    (OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
-      case dbName ~ tableName ~ stage ~ options =>
+    (OPTIONS ~> "(" ~> repsep(options, ",") <~ ")").? <~ opt(";") ^^ {
+      case dbName ~ tableName ~ _ ~ options =>
         CarbonInsertFromStageCommand(dbName, tableName,
           options.getOrElse(List[(String, String)]()).toMap[String, String])
     }
@@ -575,7 +586,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
   protected lazy val alterTableAddColumns: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~
     (ADD ~> COLUMNS ~> "(" ~> repsep(anyFieldDef, ",") <~ ")") ~
-    (TBLPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ 
{
+    (TBLPROPERTIES ~> "(" ~> repsep(options, ",") <~ ")").? <~ opt(";") ^^ {
       case dbName ~ table ~ fields ~ tblProp =>
         CarbonSparkSqlParserUtil.alterTableAddColumns(
           dbName, table, fields, tblProp)

Reply via email to