[CARBONDATA-1954] [Pre-Aggregate] CarbonHiveMetastore updated while dropping the Pre-Aggregate table & code refactored
1. To update CarbonHiveMetastore similar function was already there . Removed duplicate function defination and updated the caller. 2. code refactored so that during droping a pre-aggregate table only metadata will be deleted if processMetadata() is called. This closes #1743 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d33d3473 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d33d3473 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d33d3473 Branch: refs/heads/branch-1.3 Commit: d33d3473e53fed913a9794e150050f2007acde63 Parents: 1b72a02 Author: rahulforallp <[email protected]> Authored: Fri Dec 29 20:01:45 2017 +0530 Committer: ravipesala <[email protected]> Committed: Mon Jan 8 22:28:29 2018 +0530 ---------------------------------------------------------------------- .../testsuite/datamap/TestDataMapCommand.scala | 54 +++++++++++++++ .../org/apache/spark/sql/CarbonSession.scala | 3 - .../datamap/CarbonDropDataMapCommand.scala | 19 +++++- .../command/datamap/DataMapListeners.scala | 72 -------------------- .../command/table/CarbonDropTableCommand.scala | 38 ++++++++++- .../spark/sql/hive/CarbonHiveMetaStore.scala | 23 +------ 6 files changed, 109 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d33d3473/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala index d61971e..c38e6cf 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala @@ -20,7 +20,9 @@ package org.apache.carbondata.spark.testsuite.datamap import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.CarbonMetadata +import org.apache.carbondata.core.util.CarbonProperties class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { @@ -76,6 +78,58 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { assert(dataMapSchemaList.get(2).getChildSchema.getTableName.equals("datamaptest_datamap3")) } + test("check hivemetastore after drop datamap") { + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + "true") + sql("drop datamap if exists datamap_hiveMetaStoreTable on table hiveMetaStoreTable") + sql("drop table if exists hiveMetaStoreTable") + sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'") + + sql( + "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable") + checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable") + + sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable") + checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable") + + } + finally { + sql("drop table hiveMetaStoreTable") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } + } + + test("drop the table having pre-aggregate"){ + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + "true") + sql("drop datamap if exists datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1") + sql("drop table if exists hiveMetaStoreTable_1") + sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'") + + sql( + "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable_1") + + checkExistence(sql("show datamap on table hiveMetaStoreTable_1"), + true, + "datamap_hiveMetaStoreTable_1") + + sql("drop table hiveMetaStoreTable_1") + + checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1") + } + finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } + } + test("test datamap create with preagg with duplicate name") { intercept[Exception] { sql( http://git-wip-us.apache.org/repos/asf/carbondata/blob/d33d3473/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala index 7ee3434..34e37c5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala @@ -24,7 +24,6 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.SparkSession.Builder -import org.apache.spark.sql.execution.command.datamap.{DataMapDropTablePostListener, DropDataMapPostListener} import org.apache.spark.sql.execution.command.preaaggregate._ import org.apache.spark.sql.execution.streaming.CarbonStreamingQueryListener import org.apache.spark.sql.hive.execution.command.CarbonSetCommand @@ -250,7 +249,6 @@ object CarbonSession { def initListeners(): Unit = { OperationListenerBus.getInstance() - .addListener(classOf[DropTablePostEvent], DataMapDropTablePostListener) .addListener(classOf[LoadTablePreStatusUpdateEvent], LoadPostAggregateListener) .addListener(classOf[DeleteSegmentByIdPreEvent], PreAggregateDeleteSegmentByIdPreListener) .addListener(classOf[DeleteSegmentByDatePreEvent], PreAggregateDeleteSegmentByDatePreListener) @@ -261,7 +259,6 @@ object CarbonSession { .addListener(classOf[AlterTableRenamePreEvent], PreAggregateRenameTablePreListener) .addListener(classOf[AlterTableDataTypeChangePreEvent], PreAggregateDataTypeChangePreListener) .addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener) - .addListener(classOf[DropDataMapPostEvent], DropDataMapPostListener) .addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener) .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent], AlterPreAggregateTableCompactionPostListener) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d33d3473/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala index 7f68b05..59aa322 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala @@ -25,16 +25,15 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil +import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand import org.apache.spark.sql.hive.CarbonRelation import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} -import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events._ @@ -103,6 +102,16 @@ case class CarbonDropDataMapCommand( carbonTable.get.getTableInfo, dbName, tableName))(sparkSession) + if (dataMapSchema.isDefined) { + if (dataMapSchema.get._1.getRelationIdentifier != null) { + CarbonDropTableCommand( + ifExistsSet = true, + Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName), + dataMapSchema.get._1.getRelationIdentifier.getTableName, + dropChildTable = true + ).processMetadata(sparkSession) + } + } // fires the event after dropping datamap from main table schema val dropDataMapPostEvent = DropDataMapPostEvent( @@ -136,6 +145,12 @@ case class CarbonDropDataMapCommand( // delete the table folder val tableIdentifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession) DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName) + CarbonDropTableCommand( + ifExistsSet = true, + databaseNameOp, + dataMapName, + dropChildTable = true + ).processData(sparkSession) Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d33d3473/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala deleted file mode 100644 index d37ca0a..0000000 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.command.datamap - -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.execution.command.AtomicRunnableCommand -import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand - -import org.apache.carbondata.events.{DropDataMapPostEvent, DropTablePostEvent, Event, OperationContext, OperationEventListener} - -object DataMapDropTablePostListener extends OperationEventListener { - - /** - * Called on DropTablePostEvent occurrence - */ - override def onEvent(event: Event, operationContext: OperationContext): Unit = { - val dropPostEvent = event.asInstanceOf[DropTablePostEvent] - val carbonTable = dropPostEvent.carbonTable - val sparkSession = dropPostEvent.sparkSession - if (carbonTable.hasDataMapSchema) { - // drop all child tables - val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList - childSchemas.asScala - .filter(_.getRelationIdentifier != null) - .foreach { childSchema => - CarbonDropTableCommand( - ifExistsSet = true, - Some(childSchema.getRelationIdentifier.getDatabaseName), - childSchema.getRelationIdentifier.getTableName, - dropChildTable = true - ).run(sparkSession) - } - } - } -} - -object DropDataMapPostListener extends OperationEventListener { - - /** - * Called on DropDataMapPostEvent occurrence - */ - override def onEvent(event: Event, operationContext: OperationContext): Unit = { - val dropPostEvent = event.asInstanceOf[DropDataMapPostEvent] - val dataMapSchema = dropPostEvent.dataMapSchema - val sparkSession = dropPostEvent.sparkSession - if (dataMapSchema.isDefined) { - if (dataMapSchema.get.getRelationIdentifier != null) { - CarbonDropTableCommand( - ifExistsSet = true, - Some(dataMapSchema.get.getRelationIdentifier.getDatabaseName), - dataMapSchema.get.getRelationIdentifier.getTableName, - dropChildTable = true - ).run(sparkSession) - } - } - } -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/d33d3473/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index aaad207..312e8b0 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution.command.table +import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.spark.sql.util.CarbonException @@ -32,7 +34,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ -import org.apache.carbondata.spark.util.CommonUtil case class CarbonDropTableCommand( ifExistsSet: Boolean, @@ -42,6 +43,7 @@ case class CarbonDropTableCommand( extends AtomicRunnableCommand { var carbonTable: CarbonTable = _ + var childTables : Seq[CarbonTable] = Seq.empty override def processMetadata(sparkSession: SparkSession): Seq[Row] = { val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -83,6 +85,27 @@ case class CarbonDropTableCommand( } CarbonEnv.getInstance(sparkSession).carbonMetastore.dropTable(identifier)(sparkSession) + if (carbonTable.hasDataMapSchema) { + // drop all child tables + val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList + + childTables = childSchemas.asScala + .filter(_.getRelationIdentifier != null) + .map { childSchema => + val childTable = + CarbonEnv.getCarbonTable( + TableIdentifier(childSchema.getRelationIdentifier.getTableName, + Some(childSchema.getRelationIdentifier.getDatabaseName)))(sparkSession) + CarbonDropTableCommand( + ifExistsSet = true, + Some(childSchema.getRelationIdentifier.getDatabaseName), + childSchema.getRelationIdentifier.getTableName, + dropChildTable = true + ).processMetadata(sparkSession) + childTable + } + } + // fires the event after dropping main table val dropTablePostEvent: DropTablePostEvent = DropTablePostEvent( @@ -123,6 +146,19 @@ case class CarbonDropTableCommand( val file = FileFactory.getCarbonFile(tablePath, fileType) CarbonUtil.deleteFoldersAndFilesSilent(file) } + if (carbonTable.hasDataMapSchema && childTables.nonEmpty) { + // drop all child tables + childTables.foreach { childTable => + val carbonDropCommand = CarbonDropTableCommand( + ifExistsSet = true, + Some(childTable.getDatabaseName), + childTable.getTableName, + dropChildTable = true + ) + carbonDropCommand.carbonTable = childTable + carbonDropCommand.processData(sparkSession) + } + } } Seq.empty } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d33d3473/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index d5ac5ae..a2d1064 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -146,7 +146,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { thriftTableInfo: org.apache.carbondata.format.TableInfo, carbonTablePath: String)(sparkSession: SparkSession): String = { val schemaConverter = new ThriftWrapperSchemaConverterImpl - updateHiveMetaStoreForDataMap(newTableIdentifier, + updateHiveMetaStoreForAlter(newTableIdentifier, oldTableIdentifier, thriftTableInfo, carbonTablePath, @@ -180,27 +180,6 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { CarbonStorePath.getCarbonTablePath(oldTablePath, newTableIdentifier).getPath } - private def updateHiveMetaStoreForDataMap(newTableIdentifier: CarbonTableIdentifier, - oldTableIdentifier: CarbonTableIdentifier, - thriftTableInfo: format.TableInfo, - tablePath: String, - sparkSession: SparkSession, - schemaConverter: ThriftWrapperSchemaConverterImpl) = { - val newTablePath = - CarbonUtil.getNewTablePath(new Path(tablePath), newTableIdentifier.getTableName) - val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo( - thriftTableInfo, - newTableIdentifier.getDatabaseName, - newTableIdentifier.getTableName, - newTablePath) - val dbName = oldTableIdentifier.getDatabaseName - val tableName = oldTableIdentifier.getTableName - sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString) - removeTableFromMetadata(dbName, tableName) - CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) - newTablePath - } - /** * This method will is used to remove the evolution entry in case of failure. *
