Repository: carbondata Updated Branches: refs/heads/master 2bad144a2 -> f0f4d7d09
[CARBONDATA-1552] Fixed Alter table commands issues with spark 2.2 This closes #1628 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f0f4d7d0 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f0f4d7d0 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f0f4d7d0 Branch: refs/heads/master Commit: f0f4d7d09a412475dcfde993341340fe25d2aad6 Parents: 2bad144 Author: Manohar <[email protected]> Authored: Thu Dec 7 01:24:14 2017 +0530 Committer: Venkata Ramana G <[email protected]> Committed: Thu Dec 7 17:21:40 2017 +0530 ---------------------------------------------------------------------- .../apache/spark/sql/common/util/QueryTest.scala | 6 +++--- .../schema/CarbonAlterTableAddColumnCommand.scala | 4 +++- .../CarbonAlterTableDataTypeChangeCommand.scala | 5 ++++- .../schema/CarbonAlterTableDropColumnCommand.scala | 5 ++++- .../schema/CarbonAlterTableRenameCommand.scala | 6 +++--- .../schema/CarbonAlterTableSetCommand.scala | 4 +++- .../schema/CarbonAlterTableUnsetCommand.scala | 4 +++- .../spark/sql/hive/CarbonHiveMetaStore.scala | 4 ++-- .../org/apache/spark/util/AlterTableUtil.scala | 17 ++++++++--------- .../src/main/spark2.1/CarbonSessionState.scala | 9 +++++++++ .../src/main/spark2.2/CarbonSessionState.scala | 10 ++++++++++ .../spark/sql/common/util/Spark2QueryTest.scala | 6 +++--- 12 files changed, 55 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala index d482c1d..d80efb8 100644 --- a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala +++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/QueryTest.scala @@ -28,7 +28,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution.command.LoadDataCommand -import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.{CarbonSessionCatalog, HiveExternalCatalog} import org.apache.spark.sql.test.{ResourceRegisterAndCopier, TestQueryExecutor} import org.apache.spark.sql.{CarbonSession, DataFrame, Row, SQLContext} import org.scalatest.Suite @@ -138,8 +138,8 @@ class QueryTest extends PlanTest with Suite { val resourcesPath = TestQueryExecutor.resourcesPath - val hiveClient = sqlContext.sparkSession.asInstanceOf[CarbonSession].sharedState. - externalCatalog.asInstanceOf[HiveExternalCatalog].client + val hiveClient = sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] + .getClient(); } object QueryTest { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala index c8f998b..f3f01bb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, MetadataCommand} +import org.apache.spark.sql.hive.CarbonSessionCatalog import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} @@ -91,7 +92,8 @@ private[sql] case class CarbonAlterTableAddColumnCommand( AlterTableUtil .updateSchemaInfo(carbonTable, schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), - thriftTable)(sparkSession) + thriftTable)(sparkSession, + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent = new AlterTableAddColumnPostEvent(sparkSession, carbonTable, alterTableAddColumnsModel) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala index dcee7c3..9bea935 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, MetadataCommand} +import org.apache.spark.sql.hive.CarbonSessionCatalog import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} @@ -96,7 +97,9 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand( schemaEvolutionEntry.setRemoved(List(deletedColumnSchema).asJava) tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0) .setTime_stamp(System.currentTimeMillis) - AlterTableUtil.updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession) + AlterTableUtil + .updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession, + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) val alterTablePostExecutionEvent: AlterTableDataTypeChangePostEvent = new AlterTableDataTypeChangePostEvent(sparkSession, carbonTable, alterTableDataTypeChangeModel) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala index c5924b6..0319d9e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala @@ -22,6 +22,7 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, MetadataCommand} +import org.apache.spark.sql.hive.CarbonSessionCatalog import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} @@ -116,7 +117,9 @@ private[sql] case class CarbonAlterTableDropColumnCommand( timeStamp = System.currentTimeMillis val schemaEvolutionEntry = new SchemaEvolutionEntry(timeStamp) schemaEvolutionEntry.setRemoved(deletedColumnSchema.toList.asJava) - AlterTableUtil.updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession) + AlterTableUtil + .updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession, + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) // TODO: 1. add check for deletion of index tables // delete dictionary files for dictionary column and clear dictionary cache from memory new AlterTableDropColumnRDD(sparkSession.sparkContext, http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index 9cf36fe..8ebaea1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command.schema import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand} -import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog} +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog} import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} @@ -109,8 +109,8 @@ private[sql] case class CarbonAlterTableRenameCommand( newTableName, carbonTable.getCarbonTableIdentifier.getTableId) var newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName) metastore.removeTableFromMetadata(oldDatabaseName, oldTableName) - val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog - .asInstanceOf[HiveExternalCatalog].client + val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] + .getClient() hiveClient.runSqlHive( s"ALTER TABLE $oldDatabaseName.$oldTableName RENAME TO $oldDatabaseName.$newTableName") hiveClient.runSqlHive( http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala index ffd69df..51c0e6e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command.schema import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.hive.CarbonSessionCatalog import org.apache.spark.util.AlterTableUtil private[sql] case class CarbonAlterTableSetCommand( @@ -37,7 +38,8 @@ private[sql] case class CarbonAlterTableSetCommand( tableIdentifier, properties, Nil, - set = true)(sparkSession) + set = true)(sparkSession, + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) Seq.empty } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala index 490dd61..5be5b2c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command.schema import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command._ +import org.apache.spark.sql.hive.CarbonSessionCatalog import org.apache.spark.util.AlterTableUtil import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} @@ -38,7 +39,8 @@ private[sql] case class CarbonAlterTableUnsetCommand( override def processMetadata(sparkSession: SparkSession): Seq[Row] = { AlterTableUtil.modifyTableProperties(tableIdentifier, Map.empty[String, String], - propKeys, false)(sparkSession) + propKeys, false)(sparkSession, + sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]) Seq.empty } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala index a41c51e..4b33c40 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala @@ -169,8 +169,8 @@ class CarbonHiveMetaStore extends CarbonFileMetastore { val dbName = oldTableIdentifier.getDatabaseName val tableName = oldTableIdentifier.getTableName val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "") - val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog - .asInstanceOf[HiveExternalCatalog].client + val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] + .getClient() hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET SERDEPROPERTIES($schemaParts)") sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString) http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala index 58b3362..e757836 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog} +import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog} import org.apache.spark.sql.hive.HiveExternalCatalog._ import org.apache.carbondata.common.logging.LogServiceFactory @@ -127,11 +127,11 @@ object AlterTableUtil { * @param schemaEvolutionEntry * @param thriftTable * @param sparkSession - * @param sessionState + * @param catalog */ def updateSchemaInfo(carbonTable: CarbonTable, schemaEvolutionEntry: SchemaEvolutionEntry, - thriftTable: TableInfo)(sparkSession: SparkSession): Unit = { + thriftTable: TableInfo)(sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = { val dbName = carbonTable.getDatabaseName val tableName = carbonTable.getTableName CarbonEnv.getInstance(sparkSession).carbonMetastore @@ -145,8 +145,7 @@ object AlterTableUtil { val schema = CarbonEnv.getInstance(sparkSession).carbonMetastore .lookupRelation(tableIdentifier)(sparkSession).schema.json val schemaParts = prepareSchemaJsonForAlterTable(sparkSession.sparkContext.getConf, schema) - val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog - .asInstanceOf[HiveExternalCatalog].client + val hiveClient = catalog.getClient(); hiveClient.runSqlHive(s"ALTER TABLE $dbName.$tableName SET TBLPROPERTIES($schemaParts)") sparkSession.catalog.refreshTable(tableIdentifier.quotedString) } @@ -326,11 +325,11 @@ object AlterTableUtil { * @param propKeys * @param set * @param sparkSession - * @param sessionState + * @param catalog */ def modifyTableProperties(tableIdentifier: TableIdentifier, properties: Map[String, String], - propKeys: Seq[String], set: Boolean) - (sparkSession: SparkSession): Unit = { + propKeys: Seq[String], set: Boolean) + (sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = { val tableName = tableIdentifier.table val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase) LOGGER.audit(s"Alter table properties request has been received for $dbName.$tableName") @@ -385,7 +384,7 @@ object AlterTableUtil { updateSchemaInfo(carbonTable, schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), - thriftTable)(sparkSession) + thriftTable)(sparkSession, catalog) LOGGER.info(s"Alter table properties is successful for table $dbName.$tableName") LOGGER.audit(s"Alter table properties is successful for table $dbName.$tableName") } catch { http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/spark2.1/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala index 00de331..7113e63 100644 --- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala @@ -125,6 +125,15 @@ class CarbonSessionCatalog( } isRefreshed } + + /** + * returns hive client from session state + * + * @return + */ + def getClient(): org.apache.spark.sql.hive.client.HiveClient = { + sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/main/spark2.2/CarbonSessionState.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala index 87aebc0..61149eb 100644 --- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala +++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala @@ -123,6 +123,16 @@ class CarbonSessionCatalog( rtnRelation } } + + /** + * returns hive client from HiveExternalCatalog + * + * @return + */ + def getClient(): org.apache.spark.sql.hive.client.HiveClient = { + sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog + .asInstanceOf[HiveExternalCatalog].client + } } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/f0f4d7d0/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala index 73bcddd..169bcf2 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/sql/common/util/Spark2QueryTest.scala @@ -18,13 +18,13 @@ package org.apache.spark.sql.common.util import org.apache.spark.sql.CarbonSession -import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.{CarbonSessionCatalog, HiveExternalCatalog} import org.apache.spark.sql.test.util.QueryTest class Spark2QueryTest extends QueryTest { - val hiveClient = sqlContext.sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog - .asInstanceOf[HiveExternalCatalog].client + val hiveClient = sqlContext.sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] + .getClient() } \ No newline at end of file
