This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 26f2c77  [CARBONDATA-3506]Fix alter table failures on parition table 
with hive.metastore.disallow.incompatible.col.type.changes as true
26f2c77 is described below

commit 26f2c778e5b8c10b2249862877250afdd0062a41
Author: akashrn5 <[email protected]>
AuthorDate: Wed Aug 28 12:05:13 2019 +0530

    [CARBONDATA-3506]Fix alter table failures on parition table with 
hive.metastore.disallow.incompatible.col.type.changes as true
    
    Problem:
    In case of spark2.2 and above and , when we call 
alterExternalCatalogForTableWithUpdatedSchema to update the new schema to 
external catalog
    in case of add column, spark gets the catalog table and then it itself adds 
the partition columns if the table is partition table for all the
    new data schema sent by carbon, so there will be duplicate partition 
columns, so validation fails in hive
    When the table has only two columns and one of them is partition column, 
then dropping non partition column is invalid because,
     if we allow it is like table with all columns as partition columns. So 
with the above property as true, drop column will fail to update the hive 
metastore.
    in spark2.2 and above if the datatype change is done on partition column, 
with the above property as true, it also fails,
     as we are not sending partition column for schema alter in hive
    
    Solution:
    when sending the new schema to spark to update in catalog, do not send the 
partition columns in case of spark2.2 and above,
    as spark will take care of adding parition columns to new schema sent by us.
    In the above scenario of drop, do not allow drop column, if after dropping 
the specific column, if table has only partition columns.
    Block the operation on datatype change on partition column on spark2.2 and 
above.
    
    This closes #3367
---
 .../StandardPartitionTableQueryTestCase.scala      | 29 +++++++++++++----
 .../schema/CarbonAlterTableAddColumnCommand.scala  | 20 +++++++++---
 ...nAlterTableColRenameDataTypeChangeCommand.scala | 36 +++++++++++++++++++---
 .../schema/CarbonAlterTableDropColumnCommand.scala | 35 +++++++++++++++++----
 4 files changed, 99 insertions(+), 21 deletions(-)

diff --git 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index c19c0b9..fb4b511 100644
--- 
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ 
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -21,8 +21,10 @@ import 
org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
 import org.apache.spark.sql.test.Spark2TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.util.SparkUtil
 import org.scalatest.BeforeAndAfterAll
 
+import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
@@ -439,18 +441,32 @@ test("Creation of partition table should fail if the 
colname in table schema and
 
   test("validate data in partition table after dropping and adding a column") {
     sql("drop table if exists par")
-    sql("create table par(name string) partitioned by (age double) stored by " 
+
+    sql("create table par(name string, add string) partitioned by (age double) 
stored by " +
               "'carbondata' TBLPROPERTIES('cache_level'='blocklet')")
-    sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into 
table par options" +
-        s"('header'='false')")
+    sql("insert into par select 'joey','NY',32 union all select 
'chandler','NY',32")
     sql("alter table par drop columns(name)")
     sql("alter table par add columns(name string)")
-    sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into 
table par options" +
-        s"('header'='false')")
-    checkAnswer(sql("select name from par"), Seq(Row("a"),Row("b"), Row(null), 
Row(null)))
+    sql("insert into par select 'joey','NY',32 union all select 
'joey','NY',32")
+    checkAnswer(sql("select name from par"), Seq(Row("NY"),Row("NY"), 
Row(null), Row(null)))
     sql("drop table if exists par")
   }
 
+  test("test drop column when after dropping only partition column remains and 
datatype change on partition column") {
+    sql("drop table if exists onlyPart")
+    sql("create table onlyPart(name string) partitioned by (age int) stored by 
" +
+        "'carbondata' TBLPROPERTIES('cache_level'='blocklet')")
+    val ex1 = intercept[MalformedCarbonCommandException] {
+      sql("alter table onlyPart drop columns(name)")
+    }
+    assert(ex1.getMessage.contains("alter table drop column is failed, cannot 
have the table with all columns as partition column"))
+    if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      val ex2 = intercept[MalformedCarbonCommandException] {
+        sql("alter table onlyPart change age age bigint")
+      }
+      assert(ex2.getMessage.contains("Alter datatype of the partition column 
age is not allowed"))
+    }
+    sql("drop table if exists onlyPart")
+  }
 
   private def verifyPartitionInfo(frame: DataFrame, partitionNames: 
Seq[String]) = {
     val plan = frame.queryExecution.sparkPlan
@@ -488,6 +504,7 @@ test("Creation of partition table should fail if the 
colname in table schema and
     sql("drop table if exists staticpartitionextlocload_new")
     sql("drop table if exists staticpartitionlocloadother_new")
     sql("drop table if exists par")
+    sql("drop table if exists onlyPart")
   }
 
 }
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 3fab2fb..6b0d709 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, 
AlterTableColumnSchemaGenerator, MetadataCommand}
 import org.apache.spark.sql.hive.CarbonSessionCatalog
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, SparkUtil}
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -102,12 +102,24 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
           carbonTable,
           
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
           thriftTable)(sparkSession)
+      // In case of spark2.2 and above and , when we call
+      // alterExternalCatalogForTableWithUpdatedSchema to update the new 
schema to external catalog
+      // in case of add column, spark gets the catalog table and then it 
itself adds the partition
+      // columns if the table is partition table for all the new data schema 
sent by carbon,
+      // so there will be duplicate partition columns, so send the columns 
without partition columns
+      val cols = if (SparkUtil.isSparkVersionXandAbove("2.2") && 
carbonTable.isHivePartitionTable) {
+        val partitionColumns = 
carbonTable.getPartitionInfo.getColumnSchemaList.asScala
+        val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => 
partitionColumns.contains
+        (col))
+        Some(carbonColumnsWithoutPartition ++ sortedColsBasedActualSchemaOrder)
+      } else {
+        Some(carbonColumns ++ sortedColsBasedActualSchemaOrder)
+      }
       
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterAddColumns(
-        tableIdentifier, schemaParts, Some(carbonColumns ++ 
sortedColsBasedActualSchemaOrder))
+        tableIdentifier, schemaParts, cols)
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent =
-        new AlterTableAddColumnPostEvent(sparkSession,
-          carbonTable, alterTableAddColumnsModel)
+        AlterTableAddColumnPostEvent(sparkSession, carbonTable, 
alterTableAddColumnsModel)
       OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, 
operationContext)
       LOGGER.info(s"Alter table for add columns is successful for table 
$dbName.$tableName")
     } catch {
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 49f651e..d3d63eb 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -21,10 +21,9 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, 
DataTypeInfo,
-  MetadataCommand}
+import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel, 
DataTypeInfo, MetadataCommand}
 import org.apache.spark.sql.hive.CarbonSessionCatalog
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, SparkUtil}
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -162,6 +161,18 @@ private[sql] case class 
CarbonAlterTableColRenameDataTypeChangeCommand(
         isDataTypeChange = true
       }
       if (isDataTypeChange) {
+        // if column datatype change operation is on partition column, then 
fail the datatype change
+        // operation
+        if (SparkUtil.isSparkVersionXandAbove("2.2") && null != 
carbonTable.getPartitionInfo) {
+          val partitionColumns = 
carbonTable.getPartitionInfo.getColumnSchemaList
+          partitionColumns.asScala.foreach {
+            col =>
+              if (col.getColumnName.equalsIgnoreCase(oldColumnName)) {
+                throw new MalformedCarbonCommandException(
+                  s"Alter datatype of the partition column $newColumnName is 
not allowed")
+              }
+          }
+        }
         
validateColumnDataType(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo,
           oldCarbonColumn.head)
       }
@@ -257,7 +268,7 @@ private[sql] case class 
CarbonAlterTableColRenameDataTypeChangeCommand(
    * @param carbonTable              carbonTable
    * @param tableInfo                tableInfo
    * @param addColumnSchema          added column schema
-   * @param schemaEvolutionEntryList new SchemaEvolutionEntry
+   * @param schemaEvolutionEntry     new SchemaEvolutionEntry
    */
   private def updateSchemaAndRefreshTable(sparkSession: SparkSession,
       carbonTable: CarbonTable,
@@ -275,12 +286,27 @@ private[sql] case class 
CarbonAlterTableColRenameDataTypeChangeCommand(
     // update the schema changed column at the specific index in carbonColumns 
based on schema order
     carbonColumns
       .update(schemaOrdinal, 
schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
+    // In case of spark2.2 and above and , when we call
+    // alterExternalCatalogForTableWithUpdatedSchema to update the new schema 
to external catalog
+    // in case of rename column or change datatype, spark gets the catalog 
table and then it itself
+    // adds the partition columns if the table is partition table for all the 
new data schema sent
+    // by carbon, so there will be duplicate partition columns, so send the 
columns without
+    // partition columns
+    val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
+                      carbonTable.isHivePartitionTable) {
+      val partitionColumns = 
carbonTable.getPartitionInfo.getColumnSchemaList.asScala
+      val carbonColumnsWithoutPartition = carbonColumns.filterNot(col => 
partitionColumns.contains(
+        col))
+      Some(carbonColumnsWithoutPartition)
+    } else {
+      Some(carbonColumns)
+    }
     val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo(
       carbonTable,
       schemaEvolutionEntry,
       tableInfo)(sparkSession)
     sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
-      .alterColumnChangeDataTypeOrRename(tableIdentifier, schemaParts, 
Some(carbonColumns))
+      .alterColumnChangeDataTypeOrRename(tableIdentifier, schemaParts, columns)
     sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
   }
 
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 31cfdaf..8a2e837 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel, 
MetadataCommand}
 import org.apache.spark.sql.hive.CarbonSessionCatalog
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, SparkUtil}
 
 import 
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -63,6 +63,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
           "alter table drop column is not supported for index datamap")
       }
       val partitionInfo = carbonTable.getPartitionInfo(tableName)
+      val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
       if (partitionInfo != null) {
         val partitionColumnSchemaList = 
partitionInfo.getColumnSchemaList.asScala
           .map(_.getColumnName)
@@ -74,9 +75,18 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
           throwMetadataException(dbName, tableName, "Partition columns cannot 
be dropped: " +
                                                   s"$partitionColumns")
         }
+
+        // this check is added because, when table have only two columns, one 
is partition and one
+        // is non partition, then dropping one column means, having table with 
only partition
+        // column, which is wrong
+        if (tableColumns.filterNot(col => alterTableDropColumnModel.columns
+          
.contains(col.getColName)).map(_.getColName).equals(partitionColumnSchemaList)) 
{
+          throw new MalformedCarbonCommandException(
+            "alter table drop column is failed, cannot have the table with all 
columns as " +
+            "partition columns")
+        }
       }
 
-      val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
       var dictionaryColumns = 
Seq[org.apache.carbondata.core.metadata.schema.table.column
       .ColumnSchema]()
       // TODO: if deleted column list includes bucketted column throw an error
@@ -139,11 +149,24 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
         schemaEvolutionEntry,
         tableInfo)(sparkSession)
       // get the columns in schema order and filter the dropped column in the 
column set
-      val cols = 
Some(carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
-        .collect { case carbonColumn if !carbonColumn.isInvisible => 
carbonColumn.getColumnSchema}
-        .filterNot(column => delCols.contains(column)))
+      val cols = 
carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
+        .collect { case carbonColumn if !carbonColumn.isInvisible => 
carbonColumn.getColumnSchema }
+        .filterNot(column => delCols.contains(column))
+      // In case of spark2.2 and above and , when we call
+      // alterExternalCatalogForTableWithUpdatedSchema to update the new 
schema to external catalog
+      // in case of drop column, spark gets the catalog table and then it 
itself adds the partition
+      // columns if the table is partition table for all the new data schema 
sent by carbon,
+      // so there will be duplicate partition columns, so send the columns 
without partition columns
+      val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
+                        carbonTable.isHivePartitionTable) {
+        val partitionColumns = partitionInfo.getColumnSchemaList.asScala
+        val carbonColumnsWithoutPartition = cols.filterNot(col => 
partitionColumns.contains(col))
+        Some(carbonColumnsWithoutPartition)
+      } else {
+        Some(cols)
+      }
       sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
-        .alterDropColumns(tableIdentifier, schemaParts, cols)
+        .alterDropColumns(tableIdentifier, schemaParts, columns)
       sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
       // TODO: 1. add check for deletion of index tables
       // delete dictionary files for dictionary column and clear dictionary 
cache from memory

Reply via email to