Repository: carbondata Updated Branches: refs/heads/master e5e74fc90 -> 6026680a1
[CARBONDATA-1879][Streaming] Support alter table to change the status of the streaming segment Support new SQL command to change the status of the segment from 'streaming' to 'streaming finish': Alter table <dbname.tablename> finish streaming This closes #1638 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6026680a Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6026680a Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6026680a Branch: refs/heads/master Commit: 6026680a13c0245fe2140b2fa7af67f1baf639d7 Parents: e5e74fc Author: QiangCai <[email protected]> Authored: Sun Dec 10 21:01:56 2017 +0800 Committer: Jacky Li <[email protected]> Committed: Thu Dec 14 12:50:11 2017 +0800 ---------------------------------------------------------------------- .../ThriftWrapperSchemaConverterImpl.java | 2 +- .../hadoop/test/util/StoreCreator.java | 2 +- .../spark/sql/catalyst/CarbonDDLSqlParser.scala | 2 + .../CarbonAlterTableFinishStreaming.scala | 37 +++++++++++ .../sql/parser/CarbonSpark2SqlParser.scala | 17 ++++- .../TestStreamingTableOperation.scala | 31 +++++++++ .../carbondata/processing/StoreCreator.java | 2 +- .../streaming/segment/StreamSegment.java | 66 ++++++++++++++++++++ 8 files changed, 153 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java index 945a40c..1316a25 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java @@ -635,13 +635,13 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter { schemaEvolutionList.get(schemaEvolutionList.size() - 1).getTime_stamp()); wrapperTableInfo.setDatabaseName(dbName); wrapperTableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName)); - wrapperTableInfo.setTablePath(tablePath); wrapperTableInfo.setFactTable( fromExternalToWrapperTableSchema(externalTableInfo.getFact_table(), tableName)); if (null != externalTableInfo.getDataMapSchemas()) { wrapperTableInfo.setDataMapSchemaList( fromExternalToWrapperChildSchemaList(externalTableInfo.getDataMapSchemas())); } + wrapperTableInfo.setTablePath(tablePath); return wrapperTableInfo; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index ab22945..531bed5 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -186,7 +186,6 @@ public class StoreCreator { public static CarbonTable createTable( AbsoluteTableIdentifier absoluteTableIdentifier) throws IOException { TableInfo tableInfo = new TableInfo(); - tableInfo.setTablePath(absoluteTableIdentifier.getTablePath()); tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); TableSchema tableSchema = new TableSchema(); tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); @@ -278,6 +277,7 @@ public class StoreCreator { ); tableInfo.setLastUpdatedTime(System.currentTimeMillis()); tableInfo.setFactTable(tableSchema); + tableInfo.setTablePath(absoluteTableIdentifier.getTablePath()); CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); String schemaFilePath = carbonTablePath.getSchemaFilePath(); String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath); http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index f405902..9b3d969 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -66,6 +66,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { protected val COLS = carbonKeyWord("COLS") protected val COLUMNS = carbonKeyWord("COLUMNS") protected val COMPACT = carbonKeyWord("COMPACT") + protected val FINISH = carbonKeyWord("FINISH") + protected val STREAMING = carbonKeyWord("STREAMING") protected val CREATE = carbonKeyWord("CREATE") protected val CUBE = carbonKeyWord("CUBE") protected val CUBES = carbonKeyWord("CUBES") http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala new file mode 100644 index 0000000..ce83815 --- /dev/null +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.management + +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.execution.command.MetadataCommand + +import org.apache.carbondata.streaming.segment.StreamSegment + +/** + * This command will try to change the status of the segment from "streaming" to "streaming finish" + */ +case class CarbonAlterTableFinishStreaming( + dbName: Option[String], + tableName: String) + extends MetadataCommand { + override def processMetadata(sparkSession: SparkSession): Seq[Row] = { + val carbonTable = CarbonEnv.getCarbonTable(dbName, tableName)(sparkSession) + StreamSegment.finishStreaming(carbonTable) + Seq.empty + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala index ffa2d32..dad0e3e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.parser import scala.collection.mutable import scala.language.implicitConversions -import org.apache.spark.sql.{DeleteRecords, UpdateTable} +import org.apache.spark.sql.{CarbonEnv, DeleteRecords, UpdateTable} import org.apache.spark.sql.catalyst.{CarbonDDLSqlParser, TableIdentifier} import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -71,8 +71,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { protected lazy val start: Parser[LogicalPlan] = explainPlan | startCommand protected lazy val startCommand: Parser[LogicalPlan] = - loadManagement|showLoads|alterTable|restructure|updateTable|deleteRecords| - alterPartition|datamapManagement + loadManagement | showLoads | alterTable | restructure | updateTable | deleteRecords | + alterPartition | datamapManagement | alterTableFinishStreaming protected lazy val loadManagement: Parser[LogicalPlan] = deleteLoadsByID | deleteLoadsByLoadDate | cleanFiles | loadDataNew @@ -130,6 +130,17 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser { } /** + * The below syntax is used to change the status of the segment + * from "streaming" to "streaming finish". + * ALTER TABLE tableName FINISH STREAMING + */ + protected lazy val alterTableFinishStreaming: Parser[LogicalPlan] = + ALTER ~> TABLE ~> (ident <~ ".").? ~ ident <~ FINISH <~ STREAMING <~ opt(";") ^^ { + case dbName ~ table => + CarbonAlterTableFinishStreaming(dbName, table) + } + + /** * The syntax of datamap creation is as follows. * CREATE DATAMAP datamapName ON TABLE tableName USING 'DataMapClassName' * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index ad439f2..12a8b8b 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -111,6 +111,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { // 13. handoff streaming segment createTable(tableName = "stream_table_handoff", streaming = true, withBatchLoad = false) + + // 14. finish streaming + createTable(tableName = "stream_table_finish", streaming = true, withBatchLoad = true) } test("validate streaming property") { @@ -192,6 +195,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("drop table if exists streaming.stream_table_delete") sql("drop table if exists streaming.stream_table_alter") sql("drop table if exists streaming.stream_table_handoff") + sql("drop table if exists streaming.stream_table_finish") } // normal table not support streaming ingest @@ -721,6 +725,33 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { ) } + test("alter table finish streaming") { + executeStreamingIngest( + tableName = "stream_table_finish", + batchNums = 6, + rowNumsEachBatch = 10000, + intervalOfSource = 5, + intervalOfIngest = 10, + continueSeconds = 40, + generateBadRecords = false, + badRecordAction = "force", + handoffSize = 1024L * 200 + ) + sql("alter table streaming.stream_table_finish finish streaming") + sql("show segments for table streaming.stream_table_finish").show(100, false) + + val segments = sql("show segments for table streaming.stream_table_finish").collect() + assertResult(4)(segments.length) + assertResult("Streaming Finish")(segments(0).getString(1)) + assertResult("Streaming Finish")(segments(1).getString(1)) + assertResult("Streaming Finish")(segments(2).getString(1)) + assertResult("Success")(segments(3).getString(1)) + checkAnswer( + sql("select count(*) from streaming.stream_table_finish"), + Seq(Row(5 + 6 * 10000)) + ) + } + test("do not support creating datamap on streaming table") { assert( intercept[MalformedCarbonCommandException]( http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java index 24b7415..e662757 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java @@ -176,7 +176,6 @@ public class StoreCreator { private static CarbonTable createTable() throws IOException { TableInfo tableInfo = new TableInfo(); - tableInfo.setTablePath(absoluteTableIdentifier.getTablePath()); tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); TableSchema tableSchema = new TableSchema(); tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); @@ -263,6 +262,7 @@ public class StoreCreator { ); tableInfo.setLastUpdatedTime(System.currentTimeMillis()); tableInfo.setFactTable(tableSchema); + tableInfo.setTablePath(absoluteTableIdentifier.getTablePath()); CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), http://git-wip-us.apache.org/repos/asf/carbondata/blob/6026680a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java index d733a74..7a62183 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java @@ -29,7 +29,9 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; +import org.apache.carbondata.core.locks.CarbonLockFactory; import org.apache.carbondata.core.locks.ICarbonLock; +import org.apache.carbondata.core.locks.LockUsage; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.reader.CarbonIndexFileReader; import org.apache.carbondata.core.statusmanager.FileFormat; @@ -181,6 +183,70 @@ public class StreamSegment { } /** + * change the status of the segment from "streaming" to "streaming finish" + */ + public static void finishStreaming(CarbonTable carbonTable) throws IOException { + ICarbonLock streamingLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), + LockUsage.STREAMING_LOCK); + try { + if (streamingLock.lockWithRetries()) { + ICarbonLock statusLock = CarbonLockFactory.getCarbonLockObj( + carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(), + LockUsage.TABLE_STATUS_LOCK); + try { + if (statusLock.lockWithRetries()) { + LoadMetadataDetails[] details = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath()); + boolean updated = false; + for (LoadMetadataDetails detail : details) { + if (SegmentStatus.STREAMING == detail.getSegmentStatus()) { + detail.setLoadEndTime(System.currentTimeMillis()); + detail.setSegmentStatus(SegmentStatus.STREAMING_FINISH); + updated = true; + } + } + if (updated) { + CarbonTablePath tablePath = + CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); + SegmentStatusManager.writeLoadDetailsIntoFile( + tablePath.getTableStatusFilePath(), details); + } + } else { + String msg = "Failed to acquire table status lock of " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName(); + LOGGER.error(msg); + throw new IOException(msg); + } + } finally { + if (statusLock.unlock()) { + LOGGER.info("Table unlocked successfully after table status updation" + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName()); + } else { + LOGGER.error("Unable to unlock Table lock for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + + " during table status updation"); + } + } + } else { + String msg = "Failed to finish streaming, because streaming is locked for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName(); + LOGGER.error(msg); + throw new IOException(msg); + } + } finally { + if (streamingLock.unlock()) { + LOGGER.info("Table unlocked successfully after streaming finished" + carbonTable + .getDatabaseName() + "." + carbonTable.getTableName()); + } else { + LOGGER.error("Unable to unlock Table lock for table " + + carbonTable.getDatabaseName() + "." + carbonTable.getTableName() + + " during streaming finished"); + } + } + } + + /** * invoke CarbonStreamOutputFormat to append batch data to existing carbondata file */ public static void appendBatchData(CarbonIterator<Object[]> inputIterators,
