[CARBONDATA-2719] Block update and delete on table having datamaps Table update/delete is needed to block on table which has datamaps.
This close #2483 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/56e7dad7 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/56e7dad7 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/56e7dad7 Branch: refs/heads/carbonstore Commit: 56e7dad7b18b6d5946ccdc49c0d264384225d231 Parents: 84102a2 Author: ndwangsen <[email protected]> Authored: Wed Jul 11 11:52:09 2018 +0800 Committer: xuchuanyin <[email protected]> Committed: Fri Jul 13 09:50:56 2018 +0800 ---------------------------------------------------------------------- .../lucene/LuceneFineGrainDataMapSuite.scala | 8 ++-- .../iud/DeleteCarbonTableTestCase.scala | 44 +++++++++++++++++++ .../TestInsertAndOtherCommandConcurrent.scala | 12 +++-- .../iud/UpdateCarbonTableTestCase.scala | 46 ++++++++++++++++++++ .../spark/sql/hive/CarbonAnalysisRules.scala | 37 +++++++++++++++- 5 files changed, 138 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/56e7dad7/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala index fd55145..657a3eb 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/datamap/lucene/LuceneFineGrainDataMapSuite.scala @@ -641,15 +641,15 @@ class LuceneFineGrainDataMapSuite extends QueryTest with BeforeAndAfterAll { assert(ex4.getMessage.contains("alter table drop column is not supported")) sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE datamap_test7 OPTIONS('header'='false')") - val ex5 = intercept[MalformedCarbonCommandException] { + val ex5 = intercept[UnsupportedOperationException] { sql("UPDATE datamap_test7 d set(d.city)=('luc') where d.name='n10'").show() } - assert(ex5.getMessage.contains("update operation is not supported")) + assert(ex5.getMessage.contains("Update operation is not supported")) - val ex6 = intercept[MalformedCarbonCommandException] { + val ex6 = intercept[UnsupportedOperationException] { sql("delete from datamap_test7 where name = 'n10'").show() } - assert(ex6.getMessage.contains("delete operation is not supported")) + assert(ex6.getMessage.contains("Delete operation is not supported")) } test("test lucene fine grain multiple data map on table") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/56e7dad7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala index 64aae1d..de93229 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala @@ -298,6 +298,50 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { } + test("block deleting records from table which has preaggregate datamap") { + sql("drop table if exists test_dm_main") + sql("drop table if exists test_dm_main_preagg1") + + sql("create table test_dm_main (a string, b string, c string) stored by 'carbondata'") + sql("insert into test_dm_main select 'aaa','bbb','ccc'") + sql("insert into test_dm_main select 'bbb','bbb','ccc'") + sql("insert into test_dm_main select 'ccc','bbb','ccc'") + + sql( + "create datamap preagg1 on table test_dm_main using 'preaggregate' as select" + + " a,sum(b) from test_dm_main group by a") + + assert(intercept[UnsupportedOperationException] { + sql("delete from test_dm_main_preagg1 where test_dm_main_a = 'bbb'") + }.getMessage.contains("Delete operation is not supported for pre-aggregate table")) + assert(intercept[UnsupportedOperationException] { + sql("delete from test_dm_main where a = 'ccc'") + }.getMessage.contains("Delete operation is not supported for tables which have a pre-aggregate table")) + + sql("drop table if exists test_dm_main") + sql("drop table if exists test_dm_main_preagg1") + } + + test("block deleting records from table which has index datamap") { + sql("drop table if exists test_dm_index") + + sql("create table test_dm_index (a string, b string, c string) stored by 'carbondata'") + sql("insert into test_dm_index select 'ccc','bbb','ccc'") + + sql( + s""" + | CREATE DATAMAP dm_test_dm_index ON TABLE test_dm_index + | USING 'bloomfilter' + | DMProperties('INDEX_COLUMNS'='a', 'BLOOM_SIZE'='640000') + """.stripMargin) + + assert(intercept[UnsupportedOperationException] { + sql("delete from test_dm_index where a = 'ccc'") + }.getMessage.contains("Delete operation is not supported for table which has index datamaps")) + + sql("drop table if exists test_dm_index") + } + override def afterAll { sql("use default") sql("drop database if exists iud_db cascade") http://git-wip-us.apache.org/repos/asf/carbondata/blob/56e7dad7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index dbbcd4d..c079529 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -134,7 +134,8 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA "insert overwrite is in progress for table default.orders, compaction operation is not allowed")) } - test("update should fail if insert overwrite is in progress") { + // block updating records from table which has index datamap. see PR2483 + ignore("update should fail if insert overwrite is in progress") { val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite") val ex = intercept[ConcurrentOperationException] { sql("update orders set (o_country)=('newCountry') where o_country='china'").show @@ -144,7 +145,8 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA "loading is in progress for table default.orders, data update operation is not allowed")) } - test("delete should fail if insert overwrite is in progress") { + // block deleting records from table which has index datamap. see PR2483 + ignore("delete should fail if insert overwrite is in progress") { val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite") val ex = intercept[ConcurrentOperationException] { sql("delete from orders where o_country='china'").show @@ -235,7 +237,8 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA sql("drop table t1") } - test("update should fail if insert is in progress") { + // block updating records from table which has index datamap. see PR2483 + ignore("update should fail if insert is in progress") { val future = runSqlAsync("insert into table orders select * from orders_overwrite") val ex = intercept[ConcurrentOperationException] { sql("update orders set (o_country)=('newCountry') where o_country='china'").show @@ -245,7 +248,8 @@ class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterA "loading is in progress for table default.orders, data update operation is not allowed")) } - test("delete should fail if insert is in progress") { + // block deleting records from table which has index datamap. see PR2483 + ignore("delete should fail if insert is in progress") { val future = runSqlAsync("insert into table orders select * from orders_overwrite") val ex = intercept[ConcurrentOperationException] { sql("delete from orders where o_country='china'").show http://git-wip-us.apache.org/repos/asf/carbondata/blob/56e7dad7/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index 2432715..2cb2717 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -713,6 +713,52 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { sql("drop table if exists senten") } + test("block updating table which has preaggreate datamap") { + sql("use iud") + sql("drop table if exists test_dm_main") + sql("drop table if exists test_dm_main_preagg1") + + sql("create table test_dm_main (a string, b string, c string) stored by 'carbondata'") + sql("insert into test_dm_main select 'aaa','bbb','ccc'") + sql("insert into test_dm_main select 'bbb','bbb','ccc'") + sql("insert into test_dm_main select 'ccc','bbb','ccc'") + + sql( + "create datamap preagg1 on table test_dm_main using 'preaggregate' as select" + + " a,sum(b) from test_dm_main group by a") + + assert(intercept[UnsupportedOperationException] { + sql("update test_dm_main_preagg1 set(test_dm_main_a) = ('aaa') where test_dm_main_a = 'bbb'") + }.getMessage.contains("Update operation is not supported for pre-aggregate table")) + assert(intercept[UnsupportedOperationException] { + sql("update test_dm_main set(a) = ('aaa') where a = 'ccc'") + }.getMessage.contains("Update operation is not supported for tables which have a pre-aggregate table")) + + sql("drop table if exists test_dm_main") + sql("drop table if exists test_dm_main_preagg1") + } + + test("block updating table which has index datamap") { + sql("use iud") + sql("drop table if exists test_dm_index") + + sql("create table test_dm_index (a string, b string, c string) stored by 'carbondata'") + sql("insert into test_dm_index select 'ccc','bbb','ccc'") + + sql( + s""" + | CREATE DATAMAP dm_test_dm_index ON TABLE test_dm_index + | USING 'bloomfilter' + | DMProperties('INDEX_COLUMNS'='a', 'BLOOM_SIZE'='640000') + """.stripMargin) + + assert(intercept[UnsupportedOperationException] { + sql("update test_dm_index set(a) = ('aaa') where a = 'ccc'") + }.getMessage.contains("Update operation is not supported for table which has index datamaps")) + + sql("drop table if exists test_dm_index") + } + override def afterAll { sql("use default") sql("drop database if exists iud cascade") http://git-wip-us.apache.org/repos/asf/carbondata/blob/56e7dad7/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala index b2f4505..dc8930e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala @@ -35,6 +35,8 @@ import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.CarbonReflectionUtils import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager +import org.apache.carbondata.core.util.CarbonUtil case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] { @@ -55,7 +57,23 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica Seq.empty, isDistinct = false), "tupleId")()) val projList = Seq(UnresolvedAlias(UnresolvedStar(alias.map(Seq(_)))), tupleId) - + val carbonTable = CarbonEnv.getCarbonTable(table.tableIdentifier)(sparkSession) + if (carbonTable != null) { + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { + throw new UnsupportedOperationException( + "Update operation is not supported for tables which have a pre-aggregate table. " + + "Drop pre-aggregate tables to continue.") + } + if (carbonTable.isChildDataMap) { + throw new UnsupportedOperationException( + "Update operation is not supported for pre-aggregate table") + } + val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable) + if (!indexSchemas.isEmpty) { + throw new UnsupportedOperationException( + "Update operation is not supported for table which has index datamaps") + } + } val tableRelation = if (SPARK_VERSION.startsWith("2.1")) { relation } else if (SPARK_VERSION.startsWith("2.2")) { @@ -170,6 +188,23 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica Seq.empty, isDistinct = false), "tupleId")()) val projList = Seq(UnresolvedAlias(UnresolvedStar(alias.map(Seq(_)))), tupleId) + val carbonTable = CarbonEnv.getCarbonTable(table.tableIdentifier)(sparkSession) + if (carbonTable != null) { + if (CarbonUtil.hasAggregationDataMap(carbonTable)) { + throw new UnsupportedOperationException( + "Delete operation is not supported for tables which have a pre-aggregate table. " + + "Drop pre-aggregate tables to continue.") + } + if (carbonTable.isChildDataMap) { + throw new UnsupportedOperationException( + "Delete operation is not supported for pre-aggregate table") + } + val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable) + if (!indexSchemas.isEmpty) { + throw new UnsupportedOperationException( + "Delete operation is not supported for table which has index datamaps") + } + } // include tuple id in subquery if (SPARK_VERSION.startsWith("2.1")) { Project(projList, relation)
