Repository: carbondata Updated Branches: refs/heads/master d2e70a464 -> 11b94dd13
[CARBONDATA-1345] Update tablemeta cache after table schema has been changed Refresh the tablemeta cache when table schema has been changed. Since HiveSessionState.lookupRelation is slow(especially in concurrent query scenario), dont call this method when table schema has not been changed. This closes #1217 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/11b94dd1 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/11b94dd1 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/11b94dd1 Branch: refs/heads/master Commit: 11b94dd1307c9032b62031053733ddc20e34011c Parents: d2e70a4 Author: xuchuanyin <[email protected]> Authored: Mon Jul 31 12:06:51 2017 +0800 Committer: Ravindra Pesala <[email protected]> Committed: Wed Aug 2 10:55:18 2017 +0530 ---------------------------------------------------------------------- .../spark/sql/hive/CarbonSessionState.scala | 34 ++++++++++++-------- 1 file changed, 21 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/11b94dd1/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala index 2dc3ca3..1fe6c83 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala @@ -81,34 +81,42 @@ class CarbonSessionCatalog( override def lookupRelation(name: TableIdentifier, alias: Option[String]): LogicalPlan = { val rtnRelation = super.lookupRelation(name, alias) + var toRefreshRelation = false rtnRelation match { case SubqueryAlias(_, LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _), _) => - refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation) + toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation) case LogicalRelation(carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation, _, _) => - refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation) - case relation => relation + toRefreshRelation = refreshRelationFromCache(name, alias, carbonDatasourceHadoopRelation) + case _ => } - rtnRelation + if (toRefreshRelation) { + super.lookupRelation(name, alias) + } else { + rtnRelation + } } private def refreshRelationFromCache(name: TableIdentifier, alias: Option[String], - carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Unit = { + carbonDatasourceHadoopRelation: CarbonDatasourceHadoopRelation): Boolean = { + var isRefreshed = false carbonEnv.carbonMetastore. checkSchemasModifiedTimeAndReloadTables(CarbonEnv.getInstance(sparkSession).storePath) - carbonEnv.carbonMetastore + + val tableMeta = carbonEnv.carbonMetastore .getTableFromMetadataCache(carbonDatasourceHadoopRelation.carbonTable.getDatabaseName, - carbonDatasourceHadoopRelation.carbonTable.getFactTableName) match { - case tableMeta: TableMeta => - if (tableMeta.carbonTable.getTableLastUpdatedTime != - carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime) { - refreshTable(name) - } - case _ => + carbonDatasourceHadoopRelation.carbonTable.getFactTableName) + if (tableMeta.isDefined && + tableMeta.get.carbonTable.getTableLastUpdatedTime != + carbonDatasourceHadoopRelation.carbonTable.getTableLastUpdatedTime) { + refreshTable(name) + isRefreshed = true + logInfo(s"Schema changes have been detected for table: $name") } + isRefreshed } }
