[CARBONDATA-1904][CARBONDATA-1905] Support auto handoff and close streaming
Add support for: 1. auto handoff streaming segment 2. alter streaming table to normal table by syntax: alter table compact 'close_streaming' This closes #1736 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a51ad30f Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a51ad30f Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a51ad30f Branch: refs/heads/branch-1.3 Commit: a51ad30f6647387de818dbeb2c3c39cbac2a8416 Parents: 7f3c374 Author: QiangCai <[email protected]> Authored: Thu Dec 28 18:43:43 2017 +0800 Committer: Jacky Li <[email protected]> Committed: Wed Jan 3 08:53:20 2018 +0800 ---------------------------------------------------------------------- .../core/constants/CarbonCommonConstants.java | 7 + .../carbondata/core/util/CarbonProperties.java | 21 ++ .../CarbonAlterTableCompactionCommand.scala | 66 +++++- .../CarbonAlterTableFinishStreaming.scala | 28 ++- .../sql/execution/strategy/DDLStrategy.scala | 3 +- .../TestStreamingTableOperation.scala | 234 +++++++++++++++++-- .../processing/merger/CompactionType.java | 1 + .../streaming/segment/StreamSegment.java | 71 ++---- .../carbondata/streaming/StreamHandoffRDD.scala | 123 +++++----- .../streaming/CarbonAppendableStreamSink.scala | 20 +- 10 files changed, 437 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java index a05d023..fce8373 100644 --- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java +++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java @@ -1488,6 +1488,13 @@ public final class CarbonCommonConstants { public static final String HANDOFF_SIZE = "carbon.streaming.segment.max.size"; /** + * enable auto handoff streaming segment + */ + public static final String ENABLE_AUTO_HANDOFF = "carbon.streaming.auto.handoff.enabled"; + + public static final String ENABLE_AUTO_HANDOFF_DEFAULT = "true"; + + /** * the min handoff size of streaming segment, the unit is byte */ public static final long HANDOFF_SIZE_MIN = 1024L * 1024 * 64; http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java index 7b80a8b..19a3cf3 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java @@ -107,6 +107,7 @@ public final class CarbonProperties { validateCarbonCSVReadBufferSizeByte(); validateHandoffSize(); validateCombineSmallInputFiles(); + validateEnableAutoHandoff(); } private void validateCarbonCSVReadBufferSizeByte() { @@ -297,6 +298,19 @@ public final class CarbonProperties { } } + private void validateEnableAutoHandoff() { + String enableAutoHandoffStr = + carbonProperties.getProperty(CarbonCommonConstants.ENABLE_AUTO_HANDOFF); + boolean isValid = CarbonUtil.validateBoolean(enableAutoHandoffStr); + if (!isValid) { + LOGGER.warn("The enable auto handoff value \"" + enableAutoHandoffStr + + "\" is invalid. Using the default value \"" + + CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT); + carbonProperties.setProperty(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, + CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT); + } + } + /** * This method validates the number of pages per blocklet column */ @@ -792,6 +806,13 @@ public final class CarbonProperties { return handoffSize; } + public boolean isEnableAutoHandoff() { + String enableAutoHandoffStr = CarbonProperties.getInstance().getProperty( + CarbonCommonConstants.ENABLE_AUTO_HANDOFF, + CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT); + return enableAutoHandoffStr.equalsIgnoreCase("true"); + } + /** * Validate the restrictions * http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 45d3537..6daaae5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -17,27 +17,34 @@ package org.apache.spark.sql.execution.command.management +import java.io.{File, IOException} + import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{AlterTableModel, CarbonMergerMapping, CompactionModel, DataCommand} -import org.apache.spark.sql.hive.CarbonRelation +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.CarbonException +import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory import org.apache.carbondata.spark.util.CommonUtil import org.apache.carbondata.streaming.StreamHandoffRDD +import org.apache.carbondata.streaming.segment.StreamSegment /** * Command for the compaction in alter table command @@ -143,8 +150,14 @@ case class CarbonAlterTableCompactionCommand( if (compactionType == CompactionType.STREAMING) { StreamHandoffRDD.startStreamingHandoffThread( carbonLoadModel, - sqlContext, - storeLocation) + sqlContext.sparkSession) + return + } + + if (compactionType == CompactionType.CLOSE_STREAMING) { + closeStreamingTable( + carbonLoadModel, + sqlContext.sparkSession) return } @@ -254,4 +267,51 @@ case class CarbonAlterTableCompactionCommand( } } } + + def closeStreamingTable( + carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession + ): Unit = { + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName) + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + // 1. acquire lock of streaming.lock + val streamingLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getTableInfo.getOrCreateAbsoluteTableIdentifier, + LockUsage.STREAMING_LOCK) + try { + if (streamingLock.lockWithRetries()) { + // 2. convert segment status from "streaming" to "streaming finish" + StreamSegment.finishStreaming(carbonTable) + // 3. iterate to handoff all streaming segment to batch segment + StreamHandoffRDD.iterateStreamingHandoff(carbonLoadModel, sparkSession) + val tableIdentifier = + new TableIdentifier(carbonTable.getTableName, Option(carbonTable.getDatabaseName)) + // 4. modify table to normal table + AlterTableUtil.modifyTableProperties( + tableIdentifier, + Map("streaming" -> "false"), + Seq.empty, + true)(sparkSession, + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) + // 5. remove checkpoint + val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) + FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingCheckpointDir)) + FileFactory.deleteAllFilesOfDir(new File(tablePath.getStreamingLogDir)) + } else { + val msg = "Failed to close streaming table, because streaming is locked for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + LOGGER.error(msg) + throw new IOException(msg) + } + } finally { + if (streamingLock.unlock()) { + LOGGER.info("Table unlocked successfully after streaming finished" + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName()) + } else { + LOGGER.error("Unable to unlock Table lock for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + + " during streaming finished") + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala index ce83815..59cc0c4 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql.execution.command.management +import java.io.IOException + import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.MetadataCommand +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.streaming.segment.StreamSegment /** @@ -31,7 +35,29 @@ case class CarbonAlterTableFinishStreaming( extends MetadataCommand { override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val carbonTable = CarbonEnv.getCarbonTable(dbName, tableName)(sparkSession) - StreamSegment.finishStreaming(carbonTable) + val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + val streamingLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), + LockUsage.STREAMING_LOCK) + try { + if (streamingLock.lockWithRetries()) { + StreamSegment.finishStreaming(carbonTable) + } else { + val msg = "Failed to finish streaming, because streaming is locked for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + LOGGER.error(msg) + throw new IOException(msg) + } + } finally { + if (streamingLock.unlock()) { + LOGGER.info("Table unlocked successfully after streaming finished" + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName()) + } else { + LOGGER.error("Unable to unlock Table lock for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + + " during streaming finished") + } + } Seq.empty } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 89fcfd2..45f0f0a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -98,7 +98,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { if (CompactionType.MINOR == compactionType || CompactionType.MAJOR == compactionType || CompactionType.SEGMENT_INDEX == compactionType || - CompactionType.STREAMING == compactionType) { + CompactionType.STREAMING == compactionType || + CompactionType.CLOSE_STREAMING == compactionType) { ExecutedCommandExec(alterTable) :: Nil } else { throw new MalformedCarbonCommandException( http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index f581c72..62ab9af 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.types.StructType import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.spark.exception.MalformedCarbonCommandException @@ -114,6 +115,17 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { // 14. finish streaming createTable(tableName = "stream_table_finish", streaming = true, withBatchLoad = true) + + // 15. auto handoff streaming segment + createTable(tableName = "stream_table_auto_handoff", streaming = true, withBatchLoad = false) + + // 16. close streaming table + createTable(tableName = "stream_table_close", streaming = true, withBatchLoad = false) + createTable(tableName = "stream_table_close_auto_handoff", streaming = true, withBatchLoad = false) + + // 17. reopen streaming table after close + createTable(tableName = "stream_table_reopen", streaming = true, withBatchLoad = false) + } test("validate streaming property") { @@ -196,6 +208,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("drop table if exists streaming.stream_table_alter") sql("drop table if exists streaming.stream_table_handoff") sql("drop table if exists streaming.stream_table_finish") + sql("drop table if exists streaming.stream_table_auto_handoff") + sql("drop table if exists streaming.stream_table_close") + sql("drop table if exists streaming.stream_table_close_auto_handoff") + sql("drop table if exists streaming.stream_table_reopen") } // normal table not support streaming ingest @@ -239,7 +255,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 1, continueSeconds = 10, generateBadRecords = false, - badRecordAction = "force" + badRecordAction = "force", + autoHandoff = false ) checkAnswer( @@ -280,7 +297,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 1, continueSeconds = 10, generateBadRecords = true, - badRecordAction = "force" + badRecordAction = "force", + autoHandoff = false ) checkAnswer( sql("select count(*) from streaming.stream_table_socket"), @@ -298,7 +316,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 1, continueSeconds = 10, generateBadRecords = true, - badRecordAction = "fail" + badRecordAction = "fail", + autoHandoff = false ) val result = sql("select count(*) from streaming.bad_record_fail").collect() assert(result(0).getLong(0) < 25) @@ -314,7 +333,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 1, continueSeconds = 20, generateBadRecords = false, - badRecordAction = "force" + badRecordAction = "force", + autoHandoff = false ) val result = sql("select count(*) from streaming.stream_table_1s").collect() // 20 seconds can't ingest all data, exists data delay @@ -330,7 +350,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 10, continueSeconds = 50, generateBadRecords = false, - badRecordAction = "force" + badRecordAction = "force", + autoHandoff = false ) checkAnswer( sql("select count(*) from streaming.stream_table_10s"), @@ -347,7 +368,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 5, continueSeconds = 30, generateBadRecords = false, - badRecordAction = "force" + badRecordAction = "force", + autoHandoff = false ) checkAnswer( sql("select count(*) from streaming.stream_table_batch"), @@ -370,7 +392,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 4, continueSeconds = 20, generateBadRecords = false, - badRecordAction = "force" + badRecordAction = "force", + autoHandoff = false ) val result = sql("select * from streaming.stream_table_scan order by id").collect() @@ -393,7 +416,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 4, continueSeconds = 20, generateBadRecords = false, - badRecordAction = "force" + badRecordAction = "force", + autoHandoff = false ) val result = sql("select * from streaming.stream_table_scan_complex order by id").collect() @@ -418,7 +442,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 4, continueSeconds = 20, generateBadRecords = true, - badRecordAction = "force" + badRecordAction = "force", + autoHandoff = false ) checkAnswer( @@ -458,7 +483,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 4, continueSeconds = 20, generateBadRecords = true, - badRecordAction = "force" + badRecordAction = "force", + autoHandoff = false ) checkAnswer( @@ -503,7 +529,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 4, continueSeconds = 20, generateBadRecords = true, - badRecordAction = "force" + badRecordAction = "force", + autoHandoff = false ) checkAnswer( @@ -533,7 +560,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 4, continueSeconds = 20, generateBadRecords = true, - badRecordAction = "force" + badRecordAction = "force", + autoHandoff = false ) checkAnswer( @@ -564,7 +592,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { intervalOfIngest = 4, continueSeconds = 20, generateBadRecords = false, - badRecordAction = "force" + badRecordAction = "force", + autoHandoff = false ) for (_ <- 0 to 3) { executeBatchLoad("stream_table_compact") @@ -593,7 +622,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { continueSeconds = 40, generateBadRecords = false, badRecordAction = "force", - handoffSize = 1024L * 200 + handoffSize = 1024L * 200, + autoHandoff = false ) assert(sql("show segments for table streaming.stream_table_new").count() > 1) @@ -613,7 +643,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { continueSeconds = 15, generateBadRecords = false, badRecordAction = "force", - handoffSize = 1024L * 200 + handoffSize = 1024L * 200, + autoHandoff = false ) val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect() val segmentId = beforeDelete.map(_.getString(0)).mkString(",") @@ -635,7 +666,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { continueSeconds = 15, generateBadRecords = false, badRecordAction = "force", - handoffSize = 1024L * 200 + handoffSize = 1024L * 200, + autoHandoff = false ) val beforeDelete = sql("show segments for table streaming.stream_table_delete").collect() @@ -668,7 +700,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { continueSeconds = 40, generateBadRecords = false, badRecordAction = "force", - handoffSize = 1024L * 200 + handoffSize = 1024L * 200, + autoHandoff = false ) checkAnswer( sql("select count(*) from streaming.stream_table_alter"), @@ -698,7 +731,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { continueSeconds = 40, generateBadRecords = false, badRecordAction = "force", - handoffSize = 1024L * 200 + handoffSize = 1024L * 200, + autoHandoff = false ) val segments = sql("show segments for table streaming.stream_table_handoff").collect() assert(segments.length == 3 || segments.length == 4) @@ -726,6 +760,31 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { ) } + test("auto handoff 'streaming finish' segment to columnar segment") { + executeStreamingIngest( + tableName = "stream_table_auto_handoff", + batchNums = 6, + rowNumsEachBatch = 10000, + intervalOfSource = 5, + intervalOfIngest = 10, + continueSeconds = 40, + generateBadRecords = false, + badRecordAction = "force", + handoffSize = 1024L * 200, + autoHandoff = true + ) + Thread.sleep(10000) + val segments = sql("show segments for table streaming.stream_table_auto_handoff").collect() + assertResult(5)(segments.length) + assertResult(2)(segments.filter(_.getString(1).equals("Success")).length) + assertResult(2)(segments.filter(_.getString(1).equals("Compacted")).length) + assertResult(1)(segments.filter(_.getString(1).equals("Streaming")).length) + checkAnswer( + sql("select count(*) from streaming.stream_table_auto_handoff"), + Seq(Row(6 * 10000)) + ) + } + test("alter table finish streaming") { executeStreamingIngest( tableName = "stream_table_finish", @@ -736,7 +795,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { continueSeconds = 40, generateBadRecords = false, badRecordAction = "force", - handoffSize = 1024L * 200 + handoffSize = 1024L * 200, + autoHandoff = false ) sql("alter table streaming.stream_table_finish finish streaming") sql("show segments for table streaming.stream_table_finish").show(100, false) @@ -753,6 +813,131 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { ) } + test("alter table close streaming") { + executeStreamingIngest( + tableName = "stream_table_close", + batchNums = 6, + rowNumsEachBatch = 10000, + intervalOfSource = 5, + intervalOfIngest = 10, + continueSeconds = 40, + generateBadRecords = false, + badRecordAction = "force", + handoffSize = 1024L * 200, + autoHandoff = false + ) + + val table1 = CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close")(spark) + assertResult(true)(table1.isStreamingTable) + sql("alter table streaming.stream_table_close compact 'close_streaming'") + + val segments = sql("show segments for table streaming.stream_table_close").collect() + assertResult(6)(segments.length) + assertResult(3)(segments.filter(_.getString(1).equals("Success")).length) + assertResult(3)(segments.filter(_.getString(1).equals("Compacted")).length) + checkAnswer( + sql("select count(*) from streaming.stream_table_close"), + Seq(Row(6 * 10000)) + ) + val table2 = CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close")(spark) + assertResult(false)(table2.isStreamingTable) + } + + test("alter table close streaming with auto handoff") { + executeStreamingIngest( + tableName = "stream_table_close_auto_handoff", + batchNums = 6, + rowNumsEachBatch = 10000, + intervalOfSource = 5, + intervalOfIngest = 10, + continueSeconds = 40, + generateBadRecords = false, + badRecordAction = "force", + handoffSize = 1024L * 200, + autoHandoff = true + ) + Thread.sleep(10000) + + val table1 = + CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close_auto_handoff")(spark) + assertResult(true)(table1.isStreamingTable) + + sql("alter table streaming.stream_table_close_auto_handoff compact 'close_streaming'") + val segments = + sql("show segments for table streaming.stream_table_close_auto_handoff").collect() + assertResult(6)(segments.length) + assertResult(3)(segments.filter(_.getString(1).equals("Success")).length) + assertResult(3)(segments.filter(_.getString(1).equals("Compacted")).length) + checkAnswer( + sql("select count(*) from streaming.stream_table_close_auto_handoff"), + Seq(Row(6 * 10000)) + ) + + val table2 = + CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_close_auto_handoff")(spark) + assertResult(false)(table2.isStreamingTable) + } + + test("reopen streaming table") { + executeStreamingIngest( + tableName = "stream_table_reopen", + batchNums = 6, + rowNumsEachBatch = 10000, + intervalOfSource = 5, + intervalOfIngest = 10, + continueSeconds = 40, + generateBadRecords = false, + badRecordAction = "force", + handoffSize = 1024L * 200, + autoHandoff = true + ) + Thread.sleep(10000) + + val table1 = + CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark) + assertResult(true)(table1.isStreamingTable) + + sql("alter table streaming.stream_table_reopen compact 'close_streaming'") + val segments = + sql("show segments for table streaming.stream_table_reopen").collect() + assertResult(6)(segments.length) + assertResult(3)(segments.filter(_.getString(1).equals("Success")).length) + assertResult(3)(segments.filter(_.getString(1).equals("Compacted")).length) + checkAnswer( + sql("select count(*) from streaming.stream_table_reopen"), + Seq(Row(6 * 10000)) + ) + + val table2 = + CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark) + assertResult(false)(table2.isStreamingTable) + + sql("ALTER TABLE streaming.stream_table_reopen SET TBLPROPERTIES('streaming'='true')") + + val table3 = + CarbonEnv.getCarbonTable(Option("streaming"), "stream_table_reopen")(spark) + assertResult(true)(table3.isStreamingTable) + + executeStreamingIngest( + tableName = "stream_table_reopen", + batchNums = 6, + rowNumsEachBatch = 10000, + intervalOfSource = 5, + intervalOfIngest = 10, + continueSeconds = 40, + generateBadRecords = false, + badRecordAction = "force", + handoffSize = 1024L * 200, + autoHandoff = true + ) + Thread.sleep(10000) + + checkAnswer( + sql("select count(*) from streaming.stream_table_reopen"), + Seq(Row(6 * 10000 * 2)) + ) + } + test("do not support creating datamap on streaming table") { assert( intercept[MalformedCarbonCommandException]( @@ -828,7 +1013,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { tableIdentifier: TableIdentifier, badRecordAction: String = "force", intervalSecond: Int = 2, - handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT): Thread = { + handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT, + autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean + ): Thread = { new Thread() { override def run(): Unit = { var qry: StreamingQuery = null @@ -848,6 +1035,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { .option("dbName", tableIdentifier.database.get) .option("tableName", tableIdentifier.table) .option(CarbonCommonConstants.HANDOFF_SIZE, handoffSize) + .option(CarbonCommonConstants.ENABLE_AUTO_HANDOFF, autoHandoff) .start() qry.awaitTermination() } catch { @@ -874,7 +1062,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { continueSeconds: Int, generateBadRecords: Boolean, badRecordAction: String, - handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT + handoffSize: Long = CarbonCommonConstants.HANDOFF_SIZE_DEFAULT, + autoHandoff: Boolean = CarbonCommonConstants.ENABLE_AUTO_HANDOFF_DEFAULT.toBoolean ): Unit = { val identifier = new TableIdentifier(tableName, Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) @@ -896,7 +1085,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { tableIdentifier = identifier, badRecordAction = badRecordAction, intervalSecond = intervalOfIngest, - handoffSize = handoffSize) + handoffSize = handoffSize, + autoHandoff = autoHandoff) thread1.start() thread2.start() Thread.sleep(continueSeconds * 1000) http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java index 314afc2..39f56a2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java @@ -29,5 +29,6 @@ public enum CompactionType { IUD_DELETE_DELTA, SEGMENT_INDEX, STREAMING, + CLOSE_STREAMING, NONE } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java index 7a62183..69d0b8d 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java @@ -186,62 +186,41 @@ public class StreamSegment { * change the status of the segment from "streaming" to "streaming finish" */ public static void finishStreaming(CarbonTable carbonTable) throws IOException { - ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj( + ICarbonLock statusLock = CarbonLockFactory.getCarbonLockObj( carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), - LockUsage.STREAMING_LOCK); + LockUsage.TABLE_STATUS_LOCK); try { - if (streamingLock.lockWithRetries()) { - ICarbonLock statusLock = CarbonLockFactory.getCarbonLockObj( - carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), - LockUsage.TABLE_STATUS_LOCK); - try { - if (statusLock.lockWithRetries()) { - LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath()); - boolean updated = false; - for (LoadMetadataDetails detail : details) { - if (SegmentStatus.STREAMING == detail.getSegmentStatus()) { - detail.setLoadEndTime(System.currentTimeMillis()); - detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH); - updated = true; - } - } - if (updated) { - CarbonTablePath tablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); - SegmentStatusManager.writeLoadDetailsIntoFile( - tablePath.getTableStatusFilePath(), details); - } - } else { - String msg = "Failed to acquire table status lock of " + - carbonTable.getDatabaseName() + "." + carbonTable.getTableName(); - LOGGER.error(msg); - throw new IOException(msg); - } - } finally { - if (statusLock.unlock()) { - LOGGER.info("Table unlocked successfully after table status updation" + - carbonTable.getDatabaseName() + "." + carbonTable.getTableName()); - } else { - LOGGER.error("Unable to unlock Table lock for table " + - carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + - " during table status updation"); + if (statusLock.lockWithRetries()) { + LoadMetadataDetails[] details = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath()); + boolean updated = false; + for (LoadMetadataDetails detail : details) { + if (SegmentStatus.STREAMING == detail.getSegmentStatus()) { + detail.setLoadEndTime(System.currentTimeMillis()); + detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH); + updated = true; } } + if (updated) { + CarbonTablePath tablePath = + CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); + SegmentStatusManager.writeLoadDetailsIntoFile( + tablePath.getTableStatusFilePath(), + details); + } } else { - String msg = "Failed to finish streaming, because streaming is locked for table " + - carbonTable.getDatabaseName() + "." + carbonTable.getTableName(); + String msg = "Failed to acquire table status lock of " + carbonTable.getDatabaseName() + + "." + carbonTable.getTableName(); LOGGER.error(msg); throw new IOException(msg); } } finally { - if (streamingLock.unlock()) { - LOGGER.info("Table unlocked successfully after streaming finished" + carbonTable - .getDatabaseName() + "." + carbonTable.getTableName()); + if (statusLock.unlock()) { + LOGGER.info("Table unlocked successfully after table status updation" + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName()); } else { - LOGGER.error("Unable to unlock Table lock for table " + - carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + - " during streaming finished"); + LOGGER.error("Unable to unlock Table lock for table " + carbonTable.getDatabaseName() + + "." + carbonTable.getTableName() + " during table status updation"); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 765a88b..c88575e 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -17,6 +17,7 @@ package org.apache.carbondata.streaming +import java.io.IOException import java.text.SimpleDateFormat import java.util import java.util.Date @@ -25,13 +26,13 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{Job, TaskAttemptID, TaskType} import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskContext} -import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.SparkSession import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.datastore.block.SegmentProperties import org.apache.carbondata.core.datastore.impl.FileFactory -import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} +import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.result.iterator.RawResultIterator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} @@ -45,7 +46,7 @@ import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, C import org.apache.carbondata.processing.util.CarbonLoaderUtil import org.apache.carbondata.spark.{HandoffResult, HandoffResultImpl} import org.apache.carbondata.spark.rdd.CarbonRDD -import org.apache.carbondata.spark.util.CommonUtil +import org.apache.carbondata.streaming.segment.StreamSegment /** * partition of the handoff segment @@ -206,68 +207,73 @@ object StreamHandoffRDD { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + def iterateStreamingHandoff( + carbonLoadModel: CarbonLoadModel, + sparkSession: SparkSession + ): Unit = { + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val identifier = carbonTable.getAbsoluteTableIdentifier + val tablePath = CarbonStorePath.getCarbonTablePath(identifier) + var continueHandoff = false + // require handoff lock on table + val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK) + try { + if (lock.lockWithRetries()) { + LOGGER.info("Acquired the handoff lock for table" + + s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }") + // handoff streaming segment one by one + do { + val segmentStatusManager = new SegmentStatusManager(identifier) + var loadMetadataDetails: Array[LoadMetadataDetails] = null + // lock table to read table status file + val statusLock = segmentStatusManager.getTableStatusLock + try { + if (statusLock.lockWithRetries()) { + loadMetadataDetails = SegmentStatusManager.readLoadMetadata( + tablePath.getMetadataDirectoryPath) + } + } finally { + if (null != statusLock) { + statusLock.unlock() + } + } + if (null != loadMetadataDetails) { + val streamSegments = + loadMetadataDetails.filter(_.getSegmentStatus == SegmentStatus.STREAMING_FINISH) + + continueHandoff = streamSegments.length > 0 + if (continueHandoff) { + // handoff a streaming segment + val loadMetadataDetail = streamSegments(0) + executeStreamingHandoff( + carbonLoadModel, + sparkSession, + loadMetadataDetail.getLoadName + ) + } + } else { + continueHandoff = false + } + } while (continueHandoff) + } + } finally { + if (null != lock) { + lock.unlock() + } + } + } + /** * start new thread to execute stream segment handoff */ def startStreamingHandoffThread( carbonLoadModel: CarbonLoadModel, - sqlContext: SQLContext, - storeLocation: String + sparkSession: SparkSession ): Unit = { // start a new thread to execute streaming segment handoff val handoffThread = new Thread() { override def run(): Unit = { - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val identifier = carbonTable.getAbsoluteTableIdentifier - val tablePath = CarbonStorePath.getCarbonTablePath(identifier) - var continueHandoff = false - // require handoff lock on table - val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK) - try { - if (lock.lockWithRetries()) { - LOGGER.info("Acquired the handoff lock for table" + - s" ${ carbonTable.getDatabaseName }.${ carbonTable.getTableName }") - // handoff streaming segment one by one - do { - val segmentStatusManager = new SegmentStatusManager(identifier) - var loadMetadataDetails: Array[LoadMetadataDetails] = null - // lock table to read table status file - val statusLock = segmentStatusManager.getTableStatusLock - try { - if (statusLock.lockWithRetries()) { - loadMetadataDetails = SegmentStatusManager.readLoadMetadata( - tablePath.getMetadataDirectoryPath) - } - } finally { - if (null != statusLock) { - statusLock.unlock() - } - } - if (null != loadMetadataDetails) { - val streamSegments = - loadMetadataDetails.filter(_.getSegmentStatus == SegmentStatus.STREAMING_FINISH) - - continueHandoff = streamSegments.length > 0 - if (continueHandoff) { - // handoff a streaming segment - val loadMetadataDetail = streamSegments(0) - executeStreamingHandoff( - carbonLoadModel, - sqlContext, - storeLocation, - loadMetadataDetail.getLoadName - ) - } - } else { - continueHandoff = false - } - } while (continueHandoff) - } - } finally { - if (null != lock) { - lock.unlock() - } - } + iterateStreamingHandoff(carbonLoadModel, sparkSession) } } handoffThread.start() @@ -278,8 +284,7 @@ object StreamHandoffRDD { */ def executeStreamingHandoff( carbonLoadModel: CarbonLoadModel, - sqlContext: SQLContext, - storeLocation: String, + sparkSession: SparkSession, handoffSegmenId: String ): Unit = { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable @@ -297,7 +302,7 @@ object StreamHandoffRDD { CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false) // convert a streaming segment to columnar segment val status = new StreamHandoffRDD( - sqlContext.sparkContext, + sparkSession.sparkContext, new HandoffResultImpl(), carbonLoadModel, handoffSegmenId).collect() http://git-wip-us.apache.org/repos/asf/carbondata/blob/a51ad30f/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index 064ba37..ce9446f 100644 --- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -45,7 +45,7 @@ import org.apache.carbondata.core.util.path.CarbonStorePath import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.streaming.CarbonStreamException +import org.apache.carbondata.streaming.{CarbonStreamException, StreamHandoffRDD} import org.apache.carbondata.streaming.parser.CarbonStreamParser import org.apache.carbondata.streaming.segment.StreamSegment @@ -80,6 +80,12 @@ class CarbonAppendableStreamSink( CarbonProperties.getInstance().getHandoffSize ) + // auto handoff + private val enableAutoHandoff = hadoopConf.getBoolean( + CarbonCommonConstants.ENABLE_AUTO_HANDOFF, + CarbonProperties.getInstance().isEnableAutoHandoff + ) + override def addBatch(batchId: Long, data: DataFrame): Unit = { if (batchId <= fileLog.getLatest().map(_._1).getOrElse(-1L)) { CarbonAppendableStreamSink.LOGGER.info(s"Skipping already committed batch $batchId") @@ -127,14 +133,18 @@ class CarbonAppendableStreamSink( val segmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId) val fileType = FileFactory.getFileType(segmentDir) if (segmentMaxSize <= StreamSegment.size(segmentDir)) { - val newSegmentId = - StreamSegment.close(carbonTable, currentSegmentId) + val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId) currentSegmentId = newSegmentId val newSegmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId) FileFactory.mkdirs(newSegmentDir, fileType) - } - // TODO trigger hand off operation + // TODO trigger hand off operation + if (enableAutoHandoff) { + StreamHandoffRDD.startStreamingHandoffThread( + carbonLoadModel, + sparkSession) + } + } } }
