[CARBONDATA-2010] Block streaming on main table of preaggregate datamap If the table has 'preaggregate' DataMap, it doesn't support streaming now
This closes #1791 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fc81d831 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fc81d831 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fc81d831 Branch: refs/heads/fgdatamap Commit: fc81d831cdc608435104d27eb2c39a57f66c80ed Parents: 943588d Author: QiangCai <[email protected]> Authored: Thu Jan 11 15:04:22 2018 +0800 Committer: Jacky Li <[email protected]> Committed: Mon Jan 15 14:43:58 2018 +0800 ---------------------------------------------------------------------- .../core/metadata/schema/table/CarbonTable.java | 15 +++++++++++++++ .../spark/sql/execution/strategy/DDLStrategy.scala | 10 ++++++++++ .../carbondata/TestStreamingTableOperation.scala | 12 ++++++++++++ 3 files changed, 37 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc81d831/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index e27b126..74dfef6 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -733,6 +733,21 @@ public class CarbonTable implements Serializable { return streaming != null && streaming.equalsIgnoreCase("true"); } + /** + * whether this table has aggregation DataMap or not + */ + public boolean hasAggregationDataMap() { + List<DataMapSchema> dataMapSchemaList = tableInfo.getDataMapSchemaList(); + if (dataMapSchemaList != null && !dataMapSchemaList.isEmpty()) { + for (DataMapSchema dataMapSchema : dataMapSchemaList) { + if (dataMapSchema instanceof AggregationDataMapSchema) { + return true; + } + } + } + return false; + } + public int getDimensionOrdinalMax() { return dimensionOrdinalMax; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc81d831/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala index 6ff762a..f058e96 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala @@ -218,6 +218,16 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy { case AlterTableSetPropertiesCommand(tableName, properties, isView) if CarbonEnv.getInstance(sparkSession).carbonMetastore .tableExists(tableName)(sparkSession) => { + + // TODO remove this limiation after streaming table support 'preaggregate' DataMap + // if the table has 'preaggregate' DataMap, it doesn't support streaming now + val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore + .lookupRelation(tableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable + if (carbonTable.hasAggregationDataMap) { + throw new MalformedCarbonCommandException( + "The table has 'preaggregate' DataMap, it doesn't support streaming") + } + // TODO remove this limitation later val property = properties.find(_._1.equalsIgnoreCase("streaming")) if (property.isDefined) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/fc81d831/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index a8ab6fb..62076bf 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -128,6 +128,9 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { // 18. block drop table while streaming is in progress createTable(tableName = "stream_table_drop", streaming = true, withBatchLoad = false) + + // 19. block streaming on 'preaggregate' main table + createTable(tableName = "agg_table_block", streaming = false, withBatchLoad = false) } test("validate streaming property") { @@ -216,6 +219,8 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { sql("drop table if exists streaming.stream_table_close_auto_handoff") sql("drop table if exists streaming.stream_table_reopen") sql("drop table if exists streaming.stream_table_drop") + sql("drop table if exists streaming.agg_table_block") + sql("drop table if exists streaming.agg_table_block_agg0") } // normal table not support streaming ingest @@ -995,6 +1000,13 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { assertResult(1)(resultStreaming.length) assertResult("true")(resultStreaming(0).getString(1).trim) } + + test("block streaming for 'preaggregate' table") { + sql("create datamap agg_table_block_agg0 on table streaming.agg_table_block using 'preaggregate' as select city, count(name) from streaming.agg_table_block group by city") + val msg = intercept[MalformedCarbonCommandException](sql("ALTER TABLE streaming.agg_table_block SET TBLPROPERTIES('streaming'='true')")) + assertResult("The table has 'preaggregate' DataMap, it doesn't support streaming")(msg.getMessage) + } + def createWriteSocketThread( serverSocket: ServerSocket, writeNums: Int,
