[CARBONDATA-2081] refresh cache across different session issue and schema after rename is not being updated are fixed
Scenario: 1 open spark-sql and beeline. create main table in spark-sql create preaggreagate table in beeline. drop main table in spark-sql. perform 'show tables' operation . PreAggregate table is still not deleted. Scenario: 2 perform following operation in same session: create table t5 (c1 string, c2 int) stored by 'carbondata' insert into t5 select 'asd',1 alter table t5 rename to t6 create table t5 (c1 string, c2 int,c3 string) stored by 'carbondata' insert into t5 select 'asd',1,'sdf' (query is failing) This closes #1862 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c55240d5 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c55240d5 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c55240d5 Branch: refs/heads/carbonstore Commit: c55240d58ad50e718a89d7bd06cfa229ba1ba82b Parents: 3a6136d Author: rahulforallp <[email protected]> Authored: Thu Jan 25 19:43:22 2018 +0530 Committer: ravipesala <[email protected]> Committed: Fri Jan 26 11:55:36 2018 +0530 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/CarbonEnv.scala | 19 ++++++++++++++- .../schema/CarbonAlterTableRenameCommand.scala | 2 ++ .../spark/sql/hive/CarbonFileMetastore.scala | 2 ++ .../spark/sql/hive/CarbonHiveMetaStore.scala | 4 +++- .../src/main/spark2.2/CarbonSessionState.scala | 25 +++++++------------- .../vectorreader/AddColumnTestCases.scala | 12 ++++++++++ 6 files changed, 45 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/c55240d5/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 585fe67..870b1f3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -23,10 +23,11 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.command.preaaggregate._ import org.apache.spark.sql.execution.command.timeseries.TimeSeriesFunction -import org.apache.spark.sql.hive._ +import org.apache.spark.sql.hive.{HiveSessionCatalog, _} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable @@ -111,6 +112,8 @@ object CarbonEnv { val carbonEnvMap = new ConcurrentHashMap[SparkSession, CarbonEnv] + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + def getInstance(sparkSession: SparkSession): CarbonEnv = { if (sparkSession.isInstanceOf[CarbonSession]) { sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].carbonEnv @@ -154,6 +157,7 @@ object CarbonEnv { databaseNameOp: Option[String], tableName: String) (sparkSession: SparkSession): CarbonTable = { + refreshRelationFromCache(TableIdentifier(tableName, databaseNameOp))(sparkSession) val databaseName = getDatabaseName(databaseNameOp)(sparkSession) val catalog = getInstance(sparkSession).carbonMetastore // refresh cache @@ -168,6 +172,19 @@ object CarbonEnv { .carbonTable) } + def refreshRelationFromCache(identifier: TableIdentifier)(sparkSession: SparkSession): Boolean = { + var isRefreshed = false + val carbonEnv = getInstance(sparkSession) + if (carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTable(identifier)) { + sparkSession.sessionState.catalog.refreshTable(identifier) + DataMapStoreManager.getInstance(). + clearDataMaps(AbsoluteTableIdentifier.from(CarbonProperties.getStorePath, + identifier.database.getOrElse("default"), identifier.table)) + isRefreshed = true + } + isRefreshed + } + /** * Return carbon table instance by looking up table in `sparkSession` */ http://git-wip-us.apache.org/repos/asf/carbondata/blob/c55240d5/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index 64d1d6f..dd34f08 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -122,6 +122,8 @@ private[sql] case class CarbonAlterTableRenameCommand( metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] .getClient() + sparkSession.catalog.refreshTable(TableIdentifier(oldTableName, + Some(oldDatabaseName)).quotedString) hiveClient.runSqlHive( s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName") hiveClient.runSqlHive( http://git-wip-us.apache.org/repos/asf/carbondata/blob/c55240d5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala index 0c52100..b44dc7e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala @@ -221,6 +221,7 @@ class CarbonFileMetastore extends CarbonMetaStore { val schemaConverter = new ThriftWrapperSchemaConverterImpl val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath) + CarbonMetadata.getInstance().removeTable(tableUniqueName) CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo) val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) metadata.carbonTables += carbonTable @@ -258,6 +259,7 @@ class CarbonFileMetastore extends CarbonMetaStore { oldTableIdentifier.getTableId) val path = createSchemaThriftFile(newAbsoluteTableIdentifier, thriftTableInfo) addTableCache(wrapperTableInfo, newAbsoluteTableIdentifier) + path } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c55240d5/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index 54f58fc..759471b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -51,13 +51,15 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { absIdentifier: AbsoluteTableIdentifier, sparkSession: SparkSession): CarbonRelation = { val info = CarbonUtil.convertGsonToTableInfo(parameters.asJava) - if (info != null) { + val carbonRelation = if (info != null) { val table = CarbonTable.buildFromTableInfo(info) CarbonRelation(info.getDatabaseName, info.getFactTable.getTableName, CarbonSparkUtil.createSparkMeta(table), table) } else { super.createCarbonRelation(parameters, absIdentifier, sparkSession) } + carbonRelation.refresh() + carbonRelation } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c55240d5/integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala index 66a20ea..3c151f0 100644 --- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala @@ -95,18 +95,6 @@ class CarbonSessionCatalog( CarbonEnv.initListeners() - private def refreshRelationFromCache(identifier: TableIdentifier): Boolean = { - var isRefreshed = false - if (carbonEnv.carbonMetastore.checkSchemasModifiedTimeAndReloadTable(identifier)) { - refreshTable(identifier) - DataMapStoreManager.getInstance(). - clearDataMaps(AbsoluteTableIdentifier.from(CarbonProperties.getStorePath, - identifier.database.getOrElse("default"), identifier.table)) - logInfo(s"Schema changes have been detected for table: $identifier") - isRefreshed = true - } - isRefreshed - } override def lookupRelation(name: TableIdentifier): LogicalPlan = { @@ -115,17 +103,20 @@ class CarbonSessionCatalog( rtnRelation match { case SubqueryAlias(_, LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _)) => - toRefreshRelation = refreshRelationFromCache(name) + toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession) case LogicalRelation(_: CarbonDatasourceHadoopRelation, _, _) => - toRefreshRelation = refreshRelationFromCache(name) + toRefreshRelation = CarbonEnv.refreshRelationFromCache(name)(sparkSession) case SubqueryAlias(_, relation) if relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") || relation.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") || relation.getClass.getName.equals( "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation") => - val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", - relation).asInstanceOf[CatalogTable] - toRefreshRelation = refreshRelationFromCache(catalogTable.identifier) + val catalogTable = + CarbonReflectionUtils.getFieldOfCatalogTable( + "tableMeta", + relation).asInstanceOf[CatalogTable] + toRefreshRelation = + CarbonEnv.refreshRelationFromCache(catalogTable.identifier)(sparkSession) case _ => } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c55240d5/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala index 9a87653..d36dd26 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala @@ -669,6 +669,18 @@ class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll { sql("drop table if exists renameTextFileTable") } + test("test rename [create table, rename, create same table with different schema]"){ + sql("drop table if exists t5") + sql("drop table if exists t6") + + sql("create table t5 (c1 string, c2 int) stored by 'carbondata'") + sql("insert into t5 select 'asd',1") + sql("alter table t5 rename to t6") + sql("create table t5 (c1 string, c2 int,c3 string) stored by 'carbondata'") + sql("insert into t5 select 'asd',1,'sdf'") + checkAnswer(sql("select * from t5"),Seq(Row("asd",1,"sdf"))) + } + override def afterAll { sql("DROP TABLE IF EXISTS addcolumntest") sql("DROP TABLE IF EXISTS hivetable")
