[CARBONDATA-1999] Block drop table and delete streaming segment while streaming is in progress
1.Block drop table while streaming is in progress 2.Block delete streaming segment This closes #1773 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1b72a02b Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1b72a02b Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1b72a02b Branch: refs/heads/branch-1.3 Commit: 1b72a02be2f176bcc99dc12f7621a0623d39b973 Parents: 90921eb Author: QiangCai <[email protected]> Authored: Mon Jan 8 11:14:00 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Tue Jan 9 00:49:27 2018 +0800 ---------------------------------------------------------------------- .../statusmanager/SegmentStatusManager.java | 42 ++++++----- .../command/table/CarbonDropTableCommand.scala | 4 + .../sql/execution/strategy/DDLStrategy.scala | 2 +- .../TestStreamingTableOperation.scala | 77 ++++++++++++++------ 4 files changed, 85 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/1b72a02b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 7804ea8..e1fadcf 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -473,6 +473,7 @@ public class SegmentStatusManager { */ private static List<String> updateDeletionStatus(List<String> loadIds, LoadMetadataDetails[] listOfLoadFolderDetailsArray, List<String> invalidLoadIds) { + SegmentStatus segmentStatus = null; for (String loadId : loadIds) { boolean loadFound = false; // For each load id loop through data and if the @@ -481,26 +482,29 @@ public class SegmentStatusManager { for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { if (loadId.equalsIgnoreCase(loadMetadata.getLoadName())) { - // if the segment is compacted then no need to delete that. - if (SegmentStatus.COMPACTED == loadMetadata.getSegmentStatus()) { + segmentStatus = loadMetadata.getSegmentStatus(); + if (SegmentStatus.COMPACTED == segmentStatus) { + // if the segment is compacted then no need to delete that. LOG.error("Cannot delete the Segment which is compacted. Segment is " + loadId); invalidLoadIds.add(loadId); return invalidLoadIds; - } - // if the segment status is in progress then no need to delete that. - if (SegmentStatus.INSERT_IN_PROGRESS == loadMetadata.getSegmentStatus()) { + } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus) { + // if the segment status is in progress then no need to delete that. LOG.error("Cannot delete the segment " + loadId + " which is load in progress"); invalidLoadIds.add(loadId); return invalidLoadIds; - } - // if the segment status is overwrite in progress, then no need to delete that. - if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == loadMetadata.getSegmentStatus()) { - LOG.error("Cannot delete the segemnt " + loadId + " which is load overwrite " + + } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus) { + // if the segment status is overwrite in progress, then no need to delete that. + LOG.error("Cannot delete the segment " + loadId + " which is load overwrite " + "in progress"); invalidLoadIds.add(loadId); return invalidLoadIds; - } - if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus()) { + } else if (SegmentStatus.STREAMING == segmentStatus) { + // if the segment status is streaming, the segment can't be deleted directly. + LOG.error("Cannot delete the segment " + loadId + " which is streaming in progress"); + invalidLoadIds.add(loadId); + return invalidLoadIds; + } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus) { loadFound = true; loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime()); @@ -535,17 +539,20 @@ public class SegmentStatusManager { // the metadata as deleted. boolean loadFound = false; String loadStartTimeString = "Load Start Time: "; + SegmentStatus segmentStatus = null; for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { Integer result = compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime); if (result < 0) { - if (SegmentStatus.COMPACTED == loadMetadata.getSegmentStatus()) { + segmentStatus = loadMetadata.getSegmentStatus(); + if (SegmentStatus.COMPACTED == segmentStatus) { LOG.info("Ignoring the segment : " + loadMetadata.getLoadName() + "as the segment has been compacted."); - continue; - } - if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus() && - SegmentStatus.INSERT_IN_PROGRESS != loadMetadata.getSegmentStatus() && - SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS != loadMetadata.getSegmentStatus()) { + } else if (SegmentStatus.STREAMING == segmentStatus) { + LOG.info("Ignoring the segment : " + loadMetadata.getLoadName() + + "as the segment is streaming in progress."); + } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus && + SegmentStatus.INSERT_IN_PROGRESS != segmentStatus && + SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS != segmentStatus) { loadFound = true; updateSegmentMetadataDetails(loadMetadata); LOG.info("Info: " + @@ -553,7 +560,6 @@ public class SegmentStatusManager { " Marked for Delete"); } } - } if (!loadFound) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1b72a02b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 9901f8c..aaad207 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -55,6 +55,10 @@ case class CarbonDropTableCommand( } LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]") carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession) + if (carbonTable.isStreamingTable) { + // streaming table should acquire streaming.lock + carbonLocks += CarbonLockUtil.getLockObject(identifier, LockUsage.STREAMING_LOCK) + } val relationIdentifiers = carbonTable.getTableInfo.getParentRelationIdentifiers if (relationIdentifiers != null && !relationIdentifiers.isEmpty) { if (!dropChildTable) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/1b72a02b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 2805114..6ff762a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -223,7 +223,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (property.isDefined) { if (!property.get._2.trim.equalsIgnoreCase("true")) { throw new MalformedCarbonCommandException( - "Streaming property can not be changed to 'false' once it is 'true'") + "Streaming property can not be changed once it is 'true'") } } ExecutedCommandExec(CarbonAlterTableSetCommand(tableName, properties, isView)) :: Nil http://git-wip-us.apache.org/repos/asf/carbondata/blob/1b72a02b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 62ab9af..a8ab6fb 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -32,7 +32,6 @@ import org.apache.spark.sql.types.StructType import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants -import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -105,7 +104,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { createTable(tableName = "stream_table_tolerant", streaming = true, withBatchLoad = true) // 11. table for delete segment test - createTable(tableName = "stream_table_delete", streaming = true, withBatchLoad = false) + createTable(tableName = "stream_table_delete_id", streaming = true, withBatchLoad = false) + createTable(tableName = "stream_table_delete_date", streaming = true, withBatchLoad = false) // 12. reject alter streaming properties createTable(tableName = "stream_table_alter", streaming = false, withBatchLoad = false) @@ -126,6 +126,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { // 17. reopen streaming table after close createTable(tableName = "stream_table_reopen", streaming = true, withBatchLoad = false) + // 18. block drop table while streaming is in progress + createTable(tableName = "stream_table_drop", streaming = true, withBatchLoad = false) } test("validate streaming property") { @@ -204,7 +206,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("drop table if exists streaming.stream_table_compact") sql("drop table if exists streaming.stream_table_new") sql("drop table if exists streaming.stream_table_tolerant") - sql("drop table if exists streaming.stream_table_delete") + sql("drop table if exists streaming.stream_table_delete_id") + sql("drop table if exists streaming.stream_table_delete_date") sql("drop table if exists streaming.stream_table_alter") sql("drop table if exists streaming.stream_table_handoff") sql("drop table if exists streaming.stream_table_finish") @@ -212,6 +215,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("drop table if exists streaming.stream_table_close") sql("drop table if exists streaming.stream_table_close_auto_handoff") sql("drop table if exists streaming.stream_table_reopen") + sql("drop table if exists streaming.stream_table_drop") } // normal table not support streaming ingest @@ -600,7 +604,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { } sql("alter table streaming.stream_table_compact compact 'minor'") - sql("show segments for table streaming.stream_table_compact").show val result = sql("show segments for table streaming.stream_table_compact").collect() result.foreach { row => @@ -635,48 +638,55 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { test("test deleting streaming segment by ID while ingesting") { executeStreamingIngest( - tableName = "stream_table_delete", + tableName = "stream_table_delete_id", batchNums = 6, rowNumsEachBatch = 10000, intervalOfSource = 3, intervalOfIngest = 5, - continueSeconds = 15, + continueSeconds = 20, generateBadRecords = false, badRecordAction = "force", handoffSize = 1024L * 200, autoHandoff = false ) - val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect() - val segmentId = beforeDelete.map(_.getString(0)).mkString(",") - sql(s"delete from table streaming.stream_table_delete where segment.id in ($segmentId) ") + Thread.sleep(10000) + val beforeDelete = sql("show segments for table streaming.stream_table_delete_id").collect() + val segmentIds1 = beforeDelete.filter(_.getString(1).equals("Streaming")).map(_.getString(0)).mkString(",") + val msg = intercept[Exception] { + sql(s"delete from table streaming.stream_table_delete_id where segment.id in ($segmentIds1) ") + } + assertResult(s"Delete segment by Id is failed. Invalid ID is: ${beforeDelete.length -1}")(msg.getMessage) - val rows = sql("show segments for table streaming.stream_table_delete").collect() - rows.foreach { row => + val segmentIds2 = beforeDelete.filter(_.getString(1).equals("Streaming Finish")) + .map(_.getString(0)).mkString(",") + sql(s"delete from table streaming.stream_table_delete_id where segment.id in ($segmentIds2) ") + val afterDelete = sql("show segments for table streaming.stream_table_delete_id").collect() + afterDelete.filter(!_.getString(1).equals("Streaming")).foreach { row => assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1)) } } test("test deleting streaming segment by date while ingesting") { executeStreamingIngest( - tableName = "stream_table_delete", + tableName = "stream_table_delete_date", batchNums = 6, rowNumsEachBatch = 10000, intervalOfSource = 3, intervalOfIngest = 5, - continueSeconds = 15, + continueSeconds = 20, generateBadRecords = false, badRecordAction = "force", handoffSize = 1024L * 200, autoHandoff = false ) - val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect() - - sql(s"delete from table streaming.stream_table_delete where segment.starttime before " + + Thread.sleep(10000) + val beforeDelete = sql("show segments for table streaming.stream_table_delete_date").collect() + sql(s"delete from table streaming.stream_table_delete_date where segment.starttime before " + s"'2999-10-01 01:00:00'") - - val rows = sql("show segments for table streaming.stream_table_delete").collect() - assertResult(beforeDelete.length)(rows.length) - rows.foreach { row => + val segmentIds = beforeDelete.filter(_.getString(1).equals("Streaming")) + assertResult(1)(segmentIds.length) + val afterDelete = sql("show segments for table streaming.stream_table_delete_date").collect() + afterDelete.filter(!_.getString(1).equals("Streaming")).foreach { row => assertResult(SegmentStatus.MARKED_FOR_DELETE.getMessage)(row.getString(1)) } } @@ -799,7 +809,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { autoHandoff = false ) sql("alter table streaming.stream_table_finish finish streaming") - sql("show segments for table streaming.stream_table_finish").show(100, false) val segments = sql("show segments for table streaming.stream_table_finish").collect() assert(segments.length == 4 || segments.length == 5) @@ -938,6 +947,32 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { ) } + test("block drop streaming table while streaming is in progress") { + val identifier = new TableIdentifier("stream_table_drop", Option("streaming")) + val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) + .asInstanceOf[CarbonRelation].metaData.carbonTable + val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) + var server: ServerSocket = null + try { + server = getServerSocket + val thread1 = createWriteSocketThread(server, 2, 10, 5) + val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier, "force", 5, 1024L * 200, false) + thread1.start() + thread2.start() + Thread.sleep(1000) + val msg = intercept[Exception] { + sql(s"drop table streaming.stream_table_drop") + } + assertResult("Dropping table streaming.stream_table_drop failed: Acquire table lock failed after retry, please try after some time;")(msg.getMessage) + thread1.interrupt() + thread2.interrupt() + } finally { + if (server != null) { + server.close() + } + } + } + test("do not support creating datamap on streaming table") { assert( intercept[MalformedCarbonCommandException](
