This is an automated email from the ASF dual-hosted git repository.
kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 26f2c77 [CARBONDATA-3506]Fix alter table failures on parition table
with hive.metastore.disallow.incompatible.col.type.changes as true
26f2c77 is described below
commit 26f2c778e5b8c10b2249862877250afdd0062a41
Author: akashrn5 <[email protected]>
AuthorDate: Wed Aug 28 12:05:13 2019 +0530
[CARBONDATA-3506]Fix alter table failures on parition table with
hive.metastore.disallow.incompatible.col.type.changes as true
Problem:
In case of spark2.2 and above and , when we call
alterExternalCatalogForTableWithUpdatedSchema to update the new schema to
external catalog
in case of add column, spark gets the catalog table and then it itself adds
the partition columns if the table is partition table for all the
new data schema sent by carbon, so there will be duplicate partition
columns, so validation fails in hive
When the table has only two columns and one of them is partition column,
then dropping non partition column is invalid because,
if we allow it is like table with all columns as partition columns. So
with the above property as true, drop column will fail to update the hive
metastore.
in spark2.2 and above if the datatype change is done on partition column,
with the above property as true, it also fails,
as we are not sending partition column for schema alter in hive
Solution:
when sending the new schema to spark to update in catalog, do not send the
partition columns in case of spark2.2 and above,
as spark will take care of adding parition columns to new schema sent by us.
In the above scenario of drop, do not allow drop column, if after dropping
the specific column, if table has only partition columns.
Block the operation on datatype change on partition column on spark2.2 and
above.
This closes #3367
---
.../StandardPartitionTableQueryTestCase.scala | 29 +++++++++++++----
.../schema/CarbonAlterTableAddColumnCommand.scala | 20 +++++++++---
...nAlterTableColRenameDataTypeChangeCommand.scala | 36 +++++++++++++++++++---
.../schema/CarbonAlterTableDropColumnCommand.scala | 35 +++++++++++++++++----
4 files changed, 99 insertions(+), 21 deletions(-)
diff --git
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index c19c0b9..fb4b511 100644
---
a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -21,8 +21,10 @@ import
org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
import org.apache.spark.sql.test.Spark2TestQueryExecutor
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.util.SparkUtil
import org.scalatest.BeforeAndAfterAll
+import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
@@ -439,18 +441,32 @@ test("Creation of partition table should fail if the
colname in table schema and
test("validate data in partition table after dropping and adding a column") {
sql("drop table if exists par")
- sql("create table par(name string) partitioned by (age double) stored by "
+
+ sql("create table par(name string, add string) partitioned by (age double)
stored by " +
"'carbondata' TBLPROPERTIES('cache_level'='blocklet')")
- sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into
table par options" +
- s"('header'='false')")
+ sql("insert into par select 'joey','NY',32 union all select
'chandler','NY',32")
sql("alter table par drop columns(name)")
sql("alter table par add columns(name string)")
- sql(s"load data local inpath '$resourcesPath/uniqwithoutheader.csv' into
table par options" +
- s"('header'='false')")
- checkAnswer(sql("select name from par"), Seq(Row("a"),Row("b"), Row(null),
Row(null)))
+ sql("insert into par select 'joey','NY',32 union all select
'joey','NY',32")
+ checkAnswer(sql("select name from par"), Seq(Row("NY"),Row("NY"),
Row(null), Row(null)))
sql("drop table if exists par")
}
+ test("test drop column when after dropping only partition column remains and
datatype change on partition column") {
+ sql("drop table if exists onlyPart")
+ sql("create table onlyPart(name string) partitioned by (age int) stored by
" +
+ "'carbondata' TBLPROPERTIES('cache_level'='blocklet')")
+ val ex1 = intercept[MalformedCarbonCommandException] {
+ sql("alter table onlyPart drop columns(name)")
+ }
+ assert(ex1.getMessage.contains("alter table drop column is failed, cannot
have the table with all columns as partition column"))
+ if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ val ex2 = intercept[MalformedCarbonCommandException] {
+ sql("alter table onlyPart change age age bigint")
+ }
+ assert(ex2.getMessage.contains("Alter datatype of the partition column
age is not allowed"))
+ }
+ sql("drop table if exists onlyPart")
+ }
private def verifyPartitionInfo(frame: DataFrame, partitionNames:
Seq[String]) = {
val plan = frame.queryExecution.sparkPlan
@@ -488,6 +504,7 @@ test("Creation of partition table should fail if the
colname in table schema and
sql("drop table if exists staticpartitionextlocload_new")
sql("drop table if exists staticpartitionlocloadother_new")
sql("drop table if exists par")
+ sql("drop table if exists onlyPart")
}
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 3fab2fb..6b0d709 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel,
AlterTableColumnSchemaGenerator, MetadataCommand}
import org.apache.spark.sql.hive.CarbonSessionCatalog
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, SparkUtil}
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -102,12 +102,24 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
carbonTable,
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
thriftTable)(sparkSession)
+ // In case of spark2.2 and above and , when we call
+ // alterExternalCatalogForTableWithUpdatedSchema to update the new
schema to external catalog
+ // in case of add column, spark gets the catalog table and then it
itself adds the partition
+ // columns if the table is partition table for all the new data schema
sent by carbon,
+ // so there will be duplicate partition columns, so send the columns
without partition columns
+ val cols = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
carbonTable.isHivePartitionTable) {
+ val partitionColumns =
carbonTable.getPartitionInfo.getColumnSchemaList.asScala
+ val carbonColumnsWithoutPartition = carbonColumns.filterNot(col =>
partitionColumns.contains
+ (col))
+ Some(carbonColumnsWithoutPartition ++ sortedColsBasedActualSchemaOrder)
+ } else {
+ Some(carbonColumns ++ sortedColsBasedActualSchemaOrder)
+ }
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog].alterAddColumns(
- tableIdentifier, schemaParts, Some(carbonColumns ++
sortedColsBasedActualSchemaOrder))
+ tableIdentifier, schemaParts, cols)
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent =
- new AlterTableAddColumnPostEvent(sparkSession,
- carbonTable, alterTableAddColumnsModel)
+ AlterTableAddColumnPostEvent(sparkSession, carbonTable,
alterTableAddColumnsModel)
OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent,
operationContext)
LOGGER.info(s"Alter table for add columns is successful for table
$dbName.$tableName")
} catch {
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 49f651e..d3d63eb 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -21,10 +21,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel,
DataTypeInfo,
- MetadataCommand}
+import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel,
DataTypeInfo, MetadataCommand}
import org.apache.spark.sql.hive.CarbonSessionCatalog
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, SparkUtil}
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -162,6 +161,18 @@ private[sql] case class
CarbonAlterTableColRenameDataTypeChangeCommand(
isDataTypeChange = true
}
if (isDataTypeChange) {
+ // if column datatype change operation is on partition column, then
fail the datatype change
+ // operation
+ if (SparkUtil.isSparkVersionXandAbove("2.2") && null !=
carbonTable.getPartitionInfo) {
+ val partitionColumns =
carbonTable.getPartitionInfo.getColumnSchemaList
+ partitionColumns.asScala.foreach {
+ col =>
+ if (col.getColumnName.equalsIgnoreCase(oldColumnName)) {
+ throw new MalformedCarbonCommandException(
+ s"Alter datatype of the partition column $newColumnName is
not allowed")
+ }
+ }
+ }
validateColumnDataType(alterTableColRenameAndDataTypeChangeModel.dataTypeInfo,
oldCarbonColumn.head)
}
@@ -257,7 +268,7 @@ private[sql] case class
CarbonAlterTableColRenameDataTypeChangeCommand(
* @param carbonTable carbonTable
* @param tableInfo tableInfo
* @param addColumnSchema added column schema
- * @param schemaEvolutionEntryList new SchemaEvolutionEntry
+ * @param schemaEvolutionEntry new SchemaEvolutionEntry
*/
private def updateSchemaAndRefreshTable(sparkSession: SparkSession,
carbonTable: CarbonTable,
@@ -275,12 +286,27 @@ private[sql] case class
CarbonAlterTableColRenameDataTypeChangeCommand(
// update the schema changed column at the specific index in carbonColumns
based on schema order
carbonColumns
.update(schemaOrdinal,
schemaConverter.fromExternalToWrapperColumnSchema(addColumnSchema))
+ // In case of spark2.2 and above and , when we call
+ // alterExternalCatalogForTableWithUpdatedSchema to update the new schema
to external catalog
+ // in case of rename column or change datatype, spark gets the catalog
table and then it itself
+ // adds the partition columns if the table is partition table for all the
new data schema sent
+ // by carbon, so there will be duplicate partition columns, so send the
columns without
+ // partition columns
+ val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
+ carbonTable.isHivePartitionTable) {
+ val partitionColumns =
carbonTable.getPartitionInfo.getColumnSchemaList.asScala
+ val carbonColumnsWithoutPartition = carbonColumns.filterNot(col =>
partitionColumns.contains(
+ col))
+ Some(carbonColumnsWithoutPartition)
+ } else {
+ Some(carbonColumns)
+ }
val (tableIdentifier, schemaParts) = AlterTableUtil.updateSchemaInfo(
carbonTable,
schemaEvolutionEntry,
tableInfo)(sparkSession)
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
- .alterColumnChangeDataTypeOrRename(tableIdentifier, schemaParts,
Some(carbonColumns))
+ .alterColumnChangeDataTypeOrRename(tableIdentifier, schemaParts, columns)
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
}
diff --git
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 31cfdaf..8a2e837 100644
---
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel,
MetadataCommand}
import org.apache.spark.sql.hive.CarbonSessionCatalog
-import org.apache.spark.util.AlterTableUtil
+import org.apache.spark.util.{AlterTableUtil, SparkUtil}
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -63,6 +63,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
"alter table drop column is not supported for index datamap")
}
val partitionInfo = carbonTable.getPartitionInfo(tableName)
+ val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
if (partitionInfo != null) {
val partitionColumnSchemaList =
partitionInfo.getColumnSchemaList.asScala
.map(_.getColumnName)
@@ -74,9 +75,18 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
throwMetadataException(dbName, tableName, "Partition columns cannot
be dropped: " +
s"$partitionColumns")
}
+
+ // this check is added because, when table have only two columns, one
is partition and one
+ // is non partition, then dropping one column means, having table with
only partition
+ // column, which is wrong
+ if (tableColumns.filterNot(col => alterTableDropColumnModel.columns
+
.contains(col.getColName)).map(_.getColName).equals(partitionColumnSchemaList))
{
+ throw new MalformedCarbonCommandException(
+ "alter table drop column is failed, cannot have the table with all
columns as " +
+ "partition columns")
+ }
}
- val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
var dictionaryColumns =
Seq[org.apache.carbondata.core.metadata.schema.table.column
.ColumnSchema]()
// TODO: if deleted column list includes bucketted column throw an error
@@ -139,11 +149,24 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
schemaEvolutionEntry,
tableInfo)(sparkSession)
// get the columns in schema order and filter the dropped column in the
column set
- val cols =
Some(carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
- .collect { case carbonColumn if !carbonColumn.isInvisible =>
carbonColumn.getColumnSchema}
- .filterNot(column => delCols.contains(column)))
+ val cols =
carbonTable.getCreateOrderColumn(carbonTable.getTableName).asScala
+ .collect { case carbonColumn if !carbonColumn.isInvisible =>
carbonColumn.getColumnSchema }
+ .filterNot(column => delCols.contains(column))
+ // In case of spark2.2 and above and , when we call
+ // alterExternalCatalogForTableWithUpdatedSchema to update the new
schema to external catalog
+ // in case of drop column, spark gets the catalog table and then it
itself adds the partition
+ // columns if the table is partition table for all the new data schema
sent by carbon,
+ // so there will be duplicate partition columns, so send the columns
without partition columns
+ val columns = if (SparkUtil.isSparkVersionXandAbove("2.2") &&
+ carbonTable.isHivePartitionTable) {
+ val partitionColumns = partitionInfo.getColumnSchemaList.asScala
+ val carbonColumnsWithoutPartition = cols.filterNot(col =>
partitionColumns.contains(col))
+ Some(carbonColumnsWithoutPartition)
+ } else {
+ Some(cols)
+ }
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
- .alterDropColumns(tableIdentifier, schemaParts, cols)
+ .alterDropColumns(tableIdentifier, schemaParts, columns)
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
// TODO: 1. add check for deletion of index tables
// delete dictionary files for dictionary column and clear dictionary
cache from memory