[CARBONDATA-1999] Block drop table and delete streaming segment while streaming 
is in progress

1.Block drop table while streaming is in progress
2.Block delete streaming segment

This closes #1773


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1b72a02b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1b72a02b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1b72a02b

Branch: refs/heads/branch-1.3
Commit: 1b72a02be2f176bcc99dc12f7621a0623d39b973
Parents: 90921eb
Author: QiangCai <[email protected]>
Authored: Mon Jan 8 11:14:00 2018 +0800
Committer: Jacky Li <[email protected]>
Committed: Tue Jan 9 00:49:27 2018 +0800

----------------------------------------------------------------------
 .../statusmanager/SegmentStatusManager.java     | 42 ++++++-----
 .../command/table/CarbonDropTableCommand.scala  |  4 +
 .../sql/execution/strategy/DDLStrategy.scala    |  2 +-
 .../TestStreamingTableOperation.scala           | 77 ++++++++++++++------
 4 files changed, 85 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/1b72a02b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 7804ea8..e1fadcf 100755
--- 
a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -473,6 +473,7 @@ public class SegmentStatusManager {
    */
   private static List<String> updateDeletionStatus(List<String> loadIds,
       LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> 
invalidLoadIds) {
+    SegmentStatus segmentStatus = null;
     for (String loadId : loadIds) {
       boolean loadFound = false;
       // For each load id loop through data and if the
@@ -481,26 +482,29 @@ public class SegmentStatusManager {
       for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
 
         if (loadId.equalsIgnoreCase(loadMetadata.getLoadName())) {
-          // if the segment is compacted then no need to delete that.
-          if (SegmentStatus.COMPACTED == loadMetadata.getSegmentStatus()) {
+          segmentStatus = loadMetadata.getSegmentStatus();
+          if (SegmentStatus.COMPACTED == segmentStatus) {
+            // if the segment is compacted then no need to delete that.
             LOG.error("Cannot delete the Segment which is compacted. Segment 
is " + loadId);
             invalidLoadIds.add(loadId);
             return invalidLoadIds;
-          }
-          // if the segment status is in progress then no need to delete that.
-          if (SegmentStatus.INSERT_IN_PROGRESS == 
loadMetadata.getSegmentStatus()) {
+          } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus) {
+            // if the segment status is in progress then no need to delete 
that.
             LOG.error("Cannot delete the segment " + loadId + " which is load 
in progress");
             invalidLoadIds.add(loadId);
             return invalidLoadIds;
-          }
-          // if the segment status is overwrite in progress, then no need to 
delete that.
-          if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == 
loadMetadata.getSegmentStatus()) {
-            LOG.error("Cannot delete the segemnt " + loadId + " which is load 
overwrite " +
+          } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == 
segmentStatus) {
+            // if the segment status is overwrite in progress, then no need to 
delete that.
+            LOG.error("Cannot delete the segment " + loadId + " which is load 
overwrite " +
                     "in progress");
             invalidLoadIds.add(loadId);
             return invalidLoadIds;
-          }
-          if (SegmentStatus.MARKED_FOR_DELETE != 
loadMetadata.getSegmentStatus()) {
+          } else if (SegmentStatus.STREAMING == segmentStatus) {
+            // if the segment status is streaming, the segment can't be 
deleted directly.
+            LOG.error("Cannot delete the segment " + loadId + " which is 
streaming in progress");
+            invalidLoadIds.add(loadId);
+            return invalidLoadIds;
+          } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus) {
             loadFound = true;
             loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
             
loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
@@ -535,17 +539,20 @@ public class SegmentStatusManager {
     // the metadata as deleted.
     boolean loadFound = false;
     String loadStartTimeString = "Load Start Time: ";
+    SegmentStatus segmentStatus = null;
     for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
       Integer result = 
compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime);
       if (result < 0) {
-        if (SegmentStatus.COMPACTED == loadMetadata.getSegmentStatus()) {
+        segmentStatus = loadMetadata.getSegmentStatus();
+        if (SegmentStatus.COMPACTED == segmentStatus) {
           LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
               + "as the segment has been compacted.");
-          continue;
-        }
-        if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus() 
&&
-            SegmentStatus.INSERT_IN_PROGRESS != 
loadMetadata.getSegmentStatus() &&
-            SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS != 
loadMetadata.getSegmentStatus()) {
+        } else if (SegmentStatus.STREAMING == segmentStatus) {
+          LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
+              + "as the segment is streaming in progress.");
+        } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus &&
+            SegmentStatus.INSERT_IN_PROGRESS != segmentStatus &&
+            SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS != segmentStatus) {
           loadFound = true;
           updateSegmentMetadataDetails(loadMetadata);
           LOG.info("Info: " +
@@ -553,7 +560,6 @@ public class SegmentStatusManager {
               " Marked for Delete");
         }
       }
-
     }
 
     if (!loadFound) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1b72a02b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 9901f8c..aaad207 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -55,6 +55,10 @@ case class CarbonDropTableCommand(
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
       carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, 
tableName)(sparkSession)
+      if (carbonTable.isStreamingTable) {
+        // streaming table should acquire streaming.lock
+        carbonLocks += CarbonLockUtil.getLockObject(identifier, 
LockUsage.STREAMING_LOCK)
+      }
       val relationIdentifiers = 
carbonTable.getTableInfo.getParentRelationIdentifiers
       if (relationIdentifiers != null && !relationIdentifiers.isEmpty) {
         if (!dropChildTable) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1b72a02b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 2805114..6ff762a 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -223,7 +223,7 @@ class DDLStrategy(sparkSession: SparkSession) extends 
SparkStrategy {
         if (property.isDefined) {
           if (!property.get._2.trim.equalsIgnoreCase("true")) {
             throw new MalformedCarbonCommandException(
-              "Streaming property can not be changed to 'false' once it is 
'true'")
+              "Streaming property can not be changed once it is 'true'")
           }
         }
         ExecutedCommandExec(CarbonAlterTableSetCommand(tableName, properties, 
isView)) :: Nil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1b72a02b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 62ab9af..a8ab6fb 100644
--- 
a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ 
b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -32,7 +32,6 @@ import org.apache.spark.sql.types.StructType
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -105,7 +104,8 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
     createTable(tableName = "stream_table_tolerant", streaming = true, 
withBatchLoad = true)
 
     // 11. table for delete segment test
-    createTable(tableName = "stream_table_delete", streaming = true, 
withBatchLoad = false)
+    createTable(tableName = "stream_table_delete_id", streaming = true, 
withBatchLoad = false)
+    createTable(tableName = "stream_table_delete_date", streaming = true, 
withBatchLoad = false)
 
     // 12. reject alter streaming properties
     createTable(tableName = "stream_table_alter", streaming = false, 
withBatchLoad = false)
@@ -126,6 +126,8 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
     // 17. reopen streaming table after close
     createTable(tableName = "stream_table_reopen", streaming = true, 
withBatchLoad = false)
 
+    // 18. block drop table while streaming is in progress
+    createTable(tableName = "stream_table_drop", streaming = true, 
withBatchLoad = false)
   }
 
   test("validate streaming property") {
@@ -204,7 +206,8 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists streaming.stream_table_compact")
     sql("drop table if exists streaming.stream_table_new")
     sql("drop table if exists streaming.stream_table_tolerant")
-    sql("drop table if exists streaming.stream_table_delete")
+    sql("drop table if exists streaming.stream_table_delete_id")
+    sql("drop table if exists streaming.stream_table_delete_date")
     sql("drop table if exists streaming.stream_table_alter")
     sql("drop table if exists streaming.stream_table_handoff")
     sql("drop table if exists streaming.stream_table_finish")
@@ -212,6 +215,7 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
     sql("drop table if exists streaming.stream_table_close")
     sql("drop table if exists streaming.stream_table_close_auto_handoff")
     sql("drop table if exists streaming.stream_table_reopen")
+    sql("drop table if exists streaming.stream_table_drop")
   }
 
   // normal table not support streaming ingest
@@ -600,7 +604,6 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
     }
 
     sql("alter table streaming.stream_table_compact compact 'minor'")
-    sql("show segments for table streaming.stream_table_compact").show
 
     val result = sql("show segments for table 
streaming.stream_table_compact").collect()
     result.foreach { row =>
@@ -635,48 +638,55 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
 
   test("test deleting streaming segment by ID while ingesting") {
     executeStreamingIngest(
-      tableName = "stream_table_delete",
+      tableName = "stream_table_delete_id",
       batchNums = 6,
       rowNumsEachBatch = 10000,
       intervalOfSource = 3,
       intervalOfIngest = 5,
-      continueSeconds = 15,
+      continueSeconds = 20,
       generateBadRecords = false,
       badRecordAction = "force",
       handoffSize = 1024L * 200,
       autoHandoff = false
     )
-    val beforeDelete = sql("show segments for table 
streaming.stream_table_delete").collect()
-    val segmentId = beforeDelete.map(_.getString(0)).mkString(",")
-    sql(s"delete from table streaming.stream_table_delete where segment.id in 
($segmentId) ")
+    Thread.sleep(10000)
+    val beforeDelete = sql("show segments for table 
streaming.stream_table_delete_id").collect()
+    val segmentIds1 = 
beforeDelete.filter(_.getString(1).equals("Streaming")).map(_.getString(0)).mkString(",")
+    val msg = intercept[Exception] {
+      sql(s"delete from table streaming.stream_table_delete_id where 
segment.id in ($segmentIds1) ")
+    }
+    assertResult(s"Delete segment by Id is failed. Invalid ID is: 
${beforeDelete.length -1}")(msg.getMessage)
 
-    val rows = sql("show segments for table 
streaming.stream_table_delete").collect()
-    rows.foreach { row =>
+    val segmentIds2 = beforeDelete.filter(_.getString(1).equals("Streaming 
Finish"))
+      .map(_.getString(0)).mkString(",")
+    sql(s"delete from table streaming.stream_table_delete_id where segment.id 
in ($segmentIds2) ")
+    val afterDelete = sql("show segments for table 
streaming.stream_table_delete_id").collect()
+    afterDelete.filter(!_.getString(1).equals("Streaming")).foreach { row =>
       
assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
     }
   }
 
   test("test deleting streaming segment by date while ingesting") {
     executeStreamingIngest(
-      tableName = "stream_table_delete",
+      tableName = "stream_table_delete_date",
       batchNums = 6,
       rowNumsEachBatch = 10000,
       intervalOfSource = 3,
       intervalOfIngest = 5,
-      continueSeconds = 15,
+      continueSeconds = 20,
       generateBadRecords = false,
       badRecordAction = "force",
       handoffSize = 1024L * 200,
       autoHandoff = false
     )
-    val beforeDelete = sql("show segments for table 
streaming.stream_table_delete").collect()
-
-    sql(s"delete from table streaming.stream_table_delete where 
segment.starttime before " +
+    Thread.sleep(10000)
+    val beforeDelete = sql("show segments for table 
streaming.stream_table_delete_date").collect()
+    sql(s"delete from table streaming.stream_table_delete_date where 
segment.starttime before " +
         s"'2999-10-01 01:00:00'")
-
-    val rows = sql("show segments for table 
streaming.stream_table_delete").collect()
-    assertResult(beforeDelete.length)(rows.length)
-    rows.foreach { row =>
+    val segmentIds = beforeDelete.filter(_.getString(1).equals("Streaming"))
+    assertResult(1)(segmentIds.length)
+    val afterDelete = sql("show segments for table 
streaming.stream_table_delete_date").collect()
+    afterDelete.filter(!_.getString(1).equals("Streaming")).foreach { row =>
       
assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1))
     }
   }
@@ -799,7 +809,6 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
       autoHandoff = false
     )
     sql("alter table streaming.stream_table_finish finish streaming")
-    sql("show segments for table streaming.stream_table_finish").show(100, 
false)
 
     val segments = sql("show segments for table 
streaming.stream_table_finish").collect()
     assert(segments.length == 4 || segments.length == 5)
@@ -938,6 +947,32 @@ class TestStreamingTableOperation extends QueryTest with 
BeforeAndAfterAll {
     )
   }
 
+  test("block drop streaming table while streaming is in progress") {
+    val identifier = new TableIdentifier("stream_table_drop", 
Option("streaming"))
+    val carbonTable = 
CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark)
+      .asInstanceOf[CarbonRelation].metaData.carbonTable
+    val tablePath = 
CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+    var server: ServerSocket = null
+    try {
+      server = getServerSocket
+      val thread1 = createWriteSocketThread(server, 2, 10, 5)
+      val thread2 = createSocketStreamingThread(spark, server.getLocalPort, 
tablePath, identifier, "force", 5, 1024L * 200, false)
+      thread1.start()
+      thread2.start()
+      Thread.sleep(1000)
+      val msg = intercept[Exception] {
+        sql(s"drop table streaming.stream_table_drop")
+      }
+      assertResult("Dropping table streaming.stream_table_drop failed: Acquire 
table lock failed after retry, please try after some time;")(msg.getMessage)
+      thread1.interrupt()
+      thread2.interrupt()
+    } finally {
+      if (server != null) {
+        server.close()
+      }
+    }
+  }
+
   test("do not support creating datamap on streaming table") {
     assert(
       intercept[MalformedCarbonCommandException](

Reply via email to