This is an automated email from the ASF dual-hosted git repository. ravipesala pushed a commit to branch branch-1.6 in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit ef26a4a0a556d574cc30c74c674234a4564f34c1 Author: akashrn5 <[email protected]> AuthorDate: Wed Aug 28 12:05:13 2019 +0530 [CARBONDATA-3506]Fix alter table failures on parition table with hive.metastore.disallow.incompatible.col.type.changes as true Problem: In case of spark2.2 and above and , when we call alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog in case of add column, spark gets the catalog table and then it itself adds the partition columns if the table is partition table for all the new data schema sent by carbon, so there will be duplicate partition columns, so validation fails in hive When the table has only two columns and one of them is partition column, then dropping non partition column is invalid because, if we allow it is like table with all columns as partition columns. So with the above property as true, drop column will fail to update the hive metastore. in spark2.2 and above if the datatype change is done on partition column, with the above property as true, it also fails, as we are not sending partition column for schema alter in hive Solution: when sending the new schema to spark to update in catalog, do not send the partition columns in case of spark2.2 and above, as spark will take care of adding parition columns to new schema sent by us. In the above scenario of drop, do not allow drop column, if after dropping the specific column, if table has only partition columns. Block the operation on datatype change on partition column on spark2.2 and above. This closes #3367 --- .../StandardPartitionTableQueryTestCase.scala | 29 +++++++++++++---- .../schema/CarbonAlterTableAddColumnCommand.scala | 20 +++++++++--- ...nAlterTableColRenameDataTypeChangeCommand.scala | 36 +++++++++++++++++++--- .../schema/CarbonAlterTableDropColumnCommand.scala | 35 +++++++++++++++++---- 4 files changed, 99 insertions(+), 21 deletions(-) diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala index c19c0b9..fb4b511 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala @@ -21,8 +21,10 @@ import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan import org.apache.spark.sql.test.Spark2TestQueryExecutor import org.apache.spark.sql.test.util.QueryTest import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.util.SparkUtil import org.scalatest.BeforeAndAfterAll +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory import org.apache.carbondata.core.util.CarbonProperties @@ -439,18 +441,32 @@ test("Creation of partition table should fail if the colname in table schema and test("validate data in partition table after dropping and adding a column") { sql("drop table if exists par") - sql("create table par(name string) partitioned by (age double) stored by " + + sql("create table par(name string, add string) partitioned by (age double) stored by " + "'carbondata' TBLPROPERTIES('cache_level'='blocklet')") - sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into table par options" + - s"('header'='false')") + sql("insert into par select 'joey','NY',32 union all select 'chandler','NY',32") sql("alter table par drop columns(name)") sql("alter table par add columns(name string)") - sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into table par options" + - s"('header'='false')") - checkAnswer(sql("select name from par"), Seq(Row("a"),Row("b"), Row(null), Row(null))) + sql("insert into par select 'joey','NY',32 union all select 'joey','NY',32") + checkAnswer(sql("select name from par"), Seq(Row("NY"),Row("NY"), Row(null), Row(null))) sql("drop table if exists par") } + test("test drop column when after dropping only partition column remains and datatype change on partition column") { + sql("drop table if exists onlyPart") + sql("create table onlyPart(name string) partitioned by (age int) stored by " + + "'carbondata' TBLPROPERTIES('cache_level'='blocklet')") + val ex1 = intercept[MalformedCarbonCommandException] { + sql("alter table onlyPart drop columns(name)") + } + assert(ex1.getMessage.contains("alter table drop column is failed, cannot have the table with all columns as partition column")) + if (SparkUtil.isSparkVersionXandAbove("2.2")) { + val ex2 = intercept[MalformedCarbonCommandException] { + sql("alter table onlyPart change age age bigint") + } + assert(ex2.getMessage.contains("Alter datatype of the partition column age is not allowed")) + } + sql("drop table if exists onlyPart") + } private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = { val plan = frame.queryExecution.sparkPlan @@ -488,6 +504,7 @@ test("Creation of partition table should fail if the colname in table schema and sql("drop table if exists staticpartitionextlocload_new") sql("drop table if exists staticpartitionlocloadother_new") sql("drop table if exists par") + sql("drop table if exists onlyPart") } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala index 3fab2fb..6b0d709 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableColumnSchemaGenerator, MetadataCommand} import org.apache.spark.sql.hive.CarbonSessionCatalog -import org.apache.spark.util.AlterTableUtil +import org.apache.spark.util.{AlterTableUtil, SparkUtil} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory @@ -102,12 +102,24 @@ private[sql] case class CarbonAlterTableAddColumnCommand( carbonTable, schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry), thriftTable)(sparkSession) + // In case of spark2.2 and above and , when we call + // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog + // in case of add column, spark gets the catalog table and then it itself adds the partition + // columns if the table is partition table for all the new data schema sent by carbon, + // so there will be duplicate partition columns, so send the columns without partition columns + val cols = if (SparkUtil.isSparkVersionXandAbove("2.2") && carbonTable.isHivePartitionTable) { + val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala + val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains + (col)) + Some(carbonColumnsWithoutPartition ++ sortedColsBasedActualSchemaOrder) + } else { + Some(carbonColumns ++ sortedColsBasedActualSchemaOrder) + } sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterAddColumns( - tableIdentifier, schemaParts, Some(carbonColumns ++ sortedColsBasedActualSchemaOrder)) + tableIdentifier, schemaParts, cols) sparkSession.catalog.refreshTable(tableIdentifier.quotedString) val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent = - new AlterTableAddColumnPostEvent(sparkSession, - carbonTable, alterTableAddColumnsModel) + AlterTableAddColumnPostEvent(sparkSession, carbonTable, alterTableAddColumnsModel) OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext) LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName") } catch { diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala index 49f651e..d3d63eb 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala @@ -21,10 +21,9 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} -import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, DataTypeInfo, - MetadataCommand} +import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, DataTypeInfo, MetadataCommand} import org.apache.spark.sql.hive.CarbonSessionCatalog -import org.apache.spark.util.AlterTableUtil +import org.apache.spark.util.{AlterTableUtil, SparkUtil} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory @@ -162,6 +161,18 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand( isDataTypeChange = true } if (isDataTypeChange) { + // if column datatype change operation is on partition column, then fail the datatype change + // operation + if (SparkUtil.isSparkVersionXandAbove("2.2") && null != carbonTable.getPartitionInfo) { + val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList + partitionColumns.asScala.foreach { + col => + if (col.getColumnName.equalsIgnoreCase(oldColumnName)) { + throw new MalformedCarbonCommandException( + s"Alter datatype of the partition column $newColumnName is not allowed") + } + } + } validateColumnDataType(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo, oldCarbonColumn.head) } @@ -257,7 +268,7 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand( * @param carbonTable carbonTable * @param tableInfo tableInfo * @param addColumnSchema added column schema - * @param schemaEvolutionEntryList new SchemaEvolutionEntry + * @param schemaEvolutionEntry new SchemaEvolutionEntry */ private def updateSchemaAndRefreshTable(sparkSession: SparkSession, carbonTable: CarbonTable, @@ -275,12 +286,27 @@ private[sql] case class CarbonAlterTableColRenameDataTypeChangeCommand( // update the schema changed column at the specific index in carbonColumns based on schema order carbonColumns .update(schemaOrdinal, schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema)) + // In case of spark2.2 and above and , when we call + // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog + // in case of rename column or change datatype, spark gets the catalog table and then it itself + // adds the partition columns if the table is partition table for all the new data schema sent + // by carbon, so there will be duplicate partition columns, so send the columns without + // partition columns + val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") && + carbonTable.isHivePartitionTable) { + val partitionColumns = carbonTable.getPartitionInfo.getColumnSchemaList.asScala + val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => partitionColumns.contains( + col)) + Some(carbonColumnsWithoutPartition) + } else { + Some(carbonColumns) + } val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo( carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession) sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] - .alterColumnChangeDataTypeOrRename(tableIdentifier, schemaParts, Some(carbonColumns)) + .alterColumnChangeDataTypeOrRename(tableIdentifier, schemaParts, columns) sparkSession.catalog.refreshTable(tableIdentifier.quotedString) } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala index 31cfdaf..8a2e837 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, MetadataCommand} import org.apache.spark.sql.hive.CarbonSessionCatalog -import org.apache.spark.util.AlterTableUtil +import org.apache.spark.util.{AlterTableUtil, SparkUtil} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogServiceFactory @@ -63,6 +63,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand( "alter table drop column is not supported for index datamap") } val partitionInfo = carbonTable.getPartitionInfo(tableName) + val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala if (partitionInfo != null) { val partitionColumnSchemaList = partitionInfo.getColumnSchemaList.asScala .map(_.getColumnName) @@ -74,9 +75,18 @@ private[sql] case class CarbonAlterTableDropColumnCommand( throwMetadataException(dbName, tableName, "Partition columns cannot be dropped: " + s"$partitionColumns") } + + // this check is added because, when table have only two columns, one is partition and one + // is non partition, then dropping one column means, having table with only partition + // column, which is wrong + if (tableColumns.filterNot(col => alterTableDropColumnModel.columns + .contains(col.getColName)).map(_.getColName).equals(partitionColumnSchemaList)) { + throw new MalformedCarbonCommandException( + "alter table drop column is failed, cannot have the table with all columns as " + + "partition columns") + } } - val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column .ColumnSchema]() // TODO: if deleted column list includes bucketted column throw an error @@ -139,11 +149,24 @@ private[sql] case class CarbonAlterTableDropColumnCommand( schemaEvolutionEntry, tableInfo)(sparkSession) // get the columns in schema order and filter the dropped column in the column set - val cols = Some(carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala - .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema} - .filterNot(column => delCols.contains(column))) + val cols = carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala + .collect { case carbonColumn if !carbonColumn.isInvisible => carbonColumn.getColumnSchema } + .filterNot(column => delCols.contains(column)) + // In case of spark2.2 and above and , when we call + // alterExternalCatalogForTableWithUpdatedSchema to update the new schema to external catalog + // in case of drop column, spark gets the catalog table and then it itself adds the partition + // columns if the table is partition table for all the new data schema sent by carbon, + // so there will be duplicate partition columns, so send the columns without partition columns + val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") && + carbonTable.isHivePartitionTable) { + val partitionColumns = partitionInfo.getColumnSchemaList.asScala + val carbonColumnsWithoutPartition = cols.filterNot(col => partitionColumns.contains(col)) + Some(carbonColumnsWithoutPartition) + } else { + Some(cols) + } sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog] - .alterDropColumns(tableIdentifier, schemaParts, cols) + .alterDropColumns(tableIdentifier, schemaParts, columns) sparkSession.catalog.refreshTable(tableIdentifier.quotedString) // TODO: 1. add check for deletion of index tables // delete dictionary files for dictionary column and clear dictionary cache from memory
