[CARBONDATA-2693][BloomDataMap]Fix bug for alter rename is renaming the existing table on which bloomfilter datamp exists
Fix bug for alter rename is renaming the existing table on which bloom filter datamap exists This closes #2452 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cdee81d4 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cdee81d4 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cdee81d4 Branch: refs/heads/carbonstore Commit: cdee81d4dbdd22ed728f2bd898ed888ed25a8774 Parents: 75a602d Author: ndwangsen <luffy.w...@huawei.com> Authored: Thu Jul 5 16:51:39 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Sun Jul 15 21:44:53 2018 +0800 ---------------------------------------------------------------------- .../core/datamap/DataMapStoreManager.java | 51 ++++- .../core/metadata/schema/table/CarbonTable.java | 24 +++ .../table/DiskBasedDMSchemaStorageProvider.java | 3 +- .../bloom/BloomCoarseGrainDataMapFactory.java | 21 +- .../TestRenameTableWithDataMap.scala | 192 +++++++++++++++++++ .../preaaggregate/PreAggregateListeners.scala | 5 - .../schema/CarbonAlterTableRenameCommand.scala | 22 ++- 7 files changed, 306 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdee81d4/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java index 9a7d1c1..8ce302b 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java @@ -50,6 +50,9 @@ import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonSessionInfo; import org.apache.carbondata.core.util.ThreadLocalSessionInfo; +import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV; +import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.PREAGGREGATE; + /** * It maintains all the DataMaps in it. */ @@ -125,9 +128,8 @@ public final class DataMapStoreManager { if (dataMapSchemas != null) { for (DataMapSchema dataMapSchema : dataMapSchemas) { RelationIdentifier identifier = dataMapSchema.getParentTables().get(0); - if (dataMapSchema.isIndexDataMap() && identifier.getTableName() - .equals(carbonTable.getTableName()) && identifier.getDatabaseName() - .equals(carbonTable.getDatabaseName())) { + if (dataMapSchema.isIndexDataMap() && identifier.getTableId() + .equals(carbonTable.getTableId())) { dataMaps.add(getDataMap(carbonTable, dataMapSchema)); } } @@ -173,6 +175,49 @@ public final class DataMapStoreManager { } /** + * Update the datamap schema after table rename + * This should be invoked after changing table name + * @param dataMapSchemaList + * @param newTableName + */ + public void updateDataMapSchema(List<DataMapSchema> dataMapSchemaList, + String newTableName) throws IOException { + List<DataMapSchema> newDataMapSchemas = new ArrayList<>(); + for (DataMapSchema dataMapSchema : dataMapSchemaList) { + RelationIdentifier relationIdentifier = dataMapSchema.getRelationIdentifier(); + String dataBaseName = relationIdentifier.getDatabaseName(); + String tableId = relationIdentifier.getTableId(); + String providerName = dataMapSchema.getProviderName(); + // if the preaggregate datamap,not be modified the schema + if (providerName.equalsIgnoreCase(PREAGGREGATE.toString())) { + continue; + } + // if the mv datamap,not be modified the relationIdentifier + if (!providerName.equalsIgnoreCase(MV.toString())) { + RelationIdentifier newRelationIdentifier = new RelationIdentifier(dataBaseName, + newTableName, tableId); + dataMapSchema.setRelationIdentifier(newRelationIdentifier); + } + List<RelationIdentifier> newParentTables = new ArrayList<>(); + List<RelationIdentifier> parentTables = dataMapSchema.getParentTables(); + for (RelationIdentifier identifier : parentTables) { + RelationIdentifier newParentTableIdentifier = new RelationIdentifier( + identifier.getDatabaseName(), newTableName, identifier.getTableId()); + newParentTables.add(newParentTableIdentifier); + } + dataMapSchema.setParentTables(newParentTables); + newDataMapSchemas.add(dataMapSchema); + // frist drop old schema + String dataMapName = dataMapSchema.getDataMapName(); + dropDataMapSchema(dataMapName); + } + // save new datamap schema to storage + for (DataMapSchema newDataMapSchema : newDataMapSchemas) { + saveDataMapSchema(newDataMapSchema); + } + } + + /** * Register datamap catalog for the datamap provider * @param dataMapProvider * @param dataMapSchema http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdee81d4/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java index a6936b6..ffdd6b3 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java @@ -60,6 +60,7 @@ import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.core.util.path.CarbonTablePath; +import static org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider.MV; import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema; /** @@ -466,6 +467,13 @@ public class CarbonTable implements Serializable { } /** + * @return the tabelId + */ + public String getTableId() { + return tableInfo.getFactTable().getTableId(); + } + + /** * @return the tableUniqueName */ public String getTableUniqueName() { @@ -1229,4 +1237,20 @@ public class CarbonTable implements Serializable { } return minMaxCachedColsList; } + + /** + * Return true if MV datamap present in the specified table + * @param carbonTable + * @return timeseries data map present + */ + public static boolean hasMVDataMap(CarbonTable carbonTable) throws IOException { + List<DataMapSchema> dataMapSchemaList = DataMapStoreManager.getInstance() + .getDataMapSchemasOfTable(carbonTable); + for (DataMapSchema dataMapSchema : dataMapSchemaList) { + if (dataMapSchema.getProviderName().equalsIgnoreCase(MV.toString())) { + return true; + } + } + return false; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdee81d4/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java index f90960b..7c1be26 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DiskBasedDMSchemaStorageProvider.java @@ -109,8 +109,7 @@ public class DiskBasedDMSchemaStorageProvider implements DataMapSchemaStoragePro for (DataMapSchema dataMapSchema : this.dataMapSchemas) { List<RelationIdentifier> parentTables = dataMapSchema.getParentTables(); for (RelationIdentifier identifier : parentTables) { - if (identifier.getTableName().equalsIgnoreCase(carbonTable.getTableName()) && - identifier.getDatabaseName().equalsIgnoreCase(carbonTable.getDatabaseName())) { + if (identifier.getTableId().equalsIgnoreCase(carbonTable.getTableId())) { dataMapSchemas.add(dataMapSchema); break; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdee81d4/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java ---------------------------------------------------------------------- diff --git a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java index 6183077..0fe0175 100644 --- a/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java +++ b/datamap/bloom/src/main/java/org/apache/carbondata/datamap/bloom/BloomCoarseGrainDataMapFactory.java @@ -378,7 +378,26 @@ public class BloomCoarseGrainDataMapFactory extends DataMapFactory<CoarseGrainDa } @Override public boolean willBecomeStale(TableOperation operation) { - return false; + switch (operation) { + case ALTER_RENAME: + return false; + case ALTER_DROP: + return true; + case ALTER_ADD_COLUMN: + return true; + case ALTER_CHANGE_DATATYPE: + return true; + case STREAMING: + return true; + case DELETE: + return true; + case UPDATE: + return true; + case PARTITION: + return true; + default: + return false; + } } @Override http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdee81d4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithDataMap.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithDataMap.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithDataMap.scala new file mode 100644 index 0000000..18fb28f --- /dev/null +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestRenameTableWithDataMap.scala @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.testsuite.createTable + +import org.apache.spark.sql.test.util.QueryTest +import org.scalatest.BeforeAndAfterAll + +/** + * test functionality for alter table with datamap + */ +class TestRenameTableWithDataMap extends QueryTest with BeforeAndAfterAll { + + val smallFile = s"$resourcesPath/sample.csv" + + override def beforeAll { + sql("DROP TABLE IF EXISTS carbon_table") + sql("DROP TABLE IF EXISTS carbon_tb") + sql("DROP TABLE IF EXISTS fact_table1") + } + + test("Creating a bloomfilter datamap,then table rename") { + sql( + s""" + | CREATE TABLE carbon_table( + | id INT, name String, city String, age INT + | ) + | STORED BY 'carbondata' + """.stripMargin) + + sql( + s""" + | CREATE DATAMAP dm_carbon_table_name ON TABLE carbon_table + | USING 'bloomfilter' + | DMProperties('INDEX_COLUMNS'='name,city', 'BLOOM_SIZE'='640000') + """.stripMargin) + + (1 to 2).foreach { i => + + sql( + s""" + | insert into carbon_table select 5,'bb','beijing',21 + | """.stripMargin) + + sql( + s""" + | insert into carbon_table select 6,'cc','shanghai','29' + | """.stripMargin) + sql( + s""" + | LOAD DATA LOCAL INPATH '$smallFile' INTO TABLE carbon_table + | OPTIONS('header'='false') + """.stripMargin) + } + + sql( + s""" + | show datamap on table carbon_table + """.stripMargin).show(false) + + sql( + s""" + | select * from carbon_table where name='eason' + """.stripMargin).show(false) + + sql( + s""" + | explain select * from carbon_table where name='eason' + """.stripMargin).show(false) + + sql( + s""" + | alter TABLE carbon_table rename to carbon_tb + """.stripMargin) + + sql( + s""" + | show datamap on table carbon_tb + """.stripMargin).show(false) + + sql( + s""" + | select * from carbon_tb where name='eason' + """.stripMargin).show(false) + + sql( + s""" + | explain select * from carbon_tb where name='eason' + """.stripMargin).show(false) + } + + test("Creating a preaggregate datamap,then table rename") { + sql( + """ + | CREATE TABLE fact_table1 (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table1 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + sql( + s""" + | CREATE DATAMAP dm_agg_age ON TABLE fact_table1 + | USING 'preaggregate' + | AS + | select deptno,deptname,sum(salary) from fact_table1 + | group by deptno,deptname + """.stripMargin) + + sql( + s""" + | show datamap on table fact_table1 + """.stripMargin).show(false) + sql( + s""" + | select deptno,deptname,sum(salary) from fact_table1 + | group by deptno,deptname + """.stripMargin).show(false) + + sql( + s""" + | explain select deptno,deptname,sum(salary) from fact_table1 + | group by deptno,deptname + """.stripMargin).show(false) + + val exception_tb_rename: Exception = intercept[Exception] { + sql( + s""" + | alter TABLE fact_table1 rename to fact_tb + """.stripMargin) + } + assert(exception_tb_rename.getMessage + .contains("Rename operation is not supported for table" + + " with pre-aggregate tables")) + } + + /* + * mv datamap does not support running here, now must run in mv project. + test("Creating a mv datamap,then table rename") { + sql( + """ + | CREATE TABLE fact_table2 (empname String, designation String, doj Timestamp, + | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, + | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, + | utilization int,salary int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table2 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""") + + sql("drop datamap if exists datamap1") + sql("create datamap datamap1 using 'mv' as select empname, designation from fact_table2") + sql(s"rebuild datamap datamap1") + + sql( + s""" + | show datamap on table fact_table2 + """.stripMargin).show(false) + + val exception_tb_rename: Exception = intercept[Exception] { + sql( + s""" + | alter TABLE fact_table2 rename to fact_tb2 + """.stripMargin) + } + assert(exception_tb_rename.getMessage + .contains("alter rename is not supported for mv datamap")) + } */ + + override def afterAll: Unit = { + sql("DROP TABLE IF EXISTS carbon_table") + sql("DROP TABLE IF EXISTS carbon_tb") + sql("DROP TABLE IF EXISTS fact_table1") + } +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdee81d4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala index 2fb2902..b33652f 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala @@ -802,11 +802,6 @@ object RenameTablePreListener extends OperationEventListener { throw new UnsupportedOperationException( "Rename operation is not supported for table with pre-aggregate tables") } - val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable) - if (!indexSchemas.isEmpty) { - throw new UnsupportedOperationException( - "Rename operation is not supported for table with datamaps") - } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/cdee81d4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index 0e3033e..f1e17bd 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command.schema +import java.util + import org.apache.spark.sql.{CarbonEnv, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition @@ -32,7 +34,7 @@ import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.features.TableOperation import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.CarbonTableIdentifier -import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema} import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus} import org.apache.carbondata.format.SchemaEvolutionEntry @@ -81,6 +83,10 @@ private[sql] case class CarbonAlterTableRenameCommand( if (!oldCarbonTable.canAllow(oldCarbonTable, TableOperation.ALTER_RENAME)) { throw new MalformedCarbonCommandException("alter rename is not supported for index datamap") } + // if table have create mv datamap, not support table rename + if (CarbonTable.hasMVDataMap(oldCarbonTable)) { + throw new MalformedCarbonCommandException("alter rename is not supported for mv datamap") + } var timeStamp = 0L var carbonTable: CarbonTable = null @@ -94,6 +100,15 @@ private[sql] case class CarbonAlterTableRenameCommand( if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) { throw new ConcurrentOperationException(carbonTable, "loading", "alter table rename") } + // get the old table all data map schema + val dataMapSchemaList: util.List[DataMapSchema] = new util.ArrayList[DataMapSchema]() + if (carbonTable.hasDataMapSchema) { + dataMapSchemaList.addAll(carbonTable.getTableInfo.getDataMapSchemaList) + } + val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable) + if (!indexSchemas.isEmpty) { + dataMapSchemaList.addAll(indexSchemas) + } // invalid data map for the old table, see CARBON-1690 val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier) @@ -135,6 +150,11 @@ private[sql] case class CarbonAlterTableRenameCommand( schemaEvolutionEntry, carbonTable.getTablePath)(sparkSession) + // Update the storage location with datamap schema + if (!dataMapSchemaList.isEmpty) { + DataMapStoreManager.getInstance(). + updateDataMapSchema(dataMapSchemaList, newTableName) + } val alterTableRenamePostEvent: AlterTableRenamePostEvent = AlterTableRenamePostEvent( carbonTable, alterTableRenameModel,