This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 8691cb7ca7 [CARBONDATA-4342] Fix Desc Columns shows New Column added,
even though ALter ADD column query failed
8691cb7ca7 is described below
commit 8691cb7ca7456742691f3fdd2f717dd19dc220c9
Author: Indhumathi27 <[email protected]>
AuthorDate: Thu Jun 9 20:54:30 2022 +0530
[CARBONDATA-4342] Fix Desc Columns shows New Column added, even though
ALter ADD column query failed
Why is this PR needed?
1. When spark.carbon.hive.schema.store property is enabled, alter
operations fails
with Class Cast Exception.
2. When Alter add/drop/rename column operation failed due to the issue
mentioned above,
the revert schema operation is not reverting back to the old schema
What changes were proposed in this PR?
1. Use org.apache.spark.sql.hive.CarbonSessionCatalogUtil#getClient to get
HiveClient
to avoid ClassCast Exception
2. Revert the schema in the spark Catalog table also, in case of failure
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4277
---
.../schema/CarbonAlterTableAddColumnCommand.scala | 8 ++-
...nAlterTableColRenameDataTypeChangeCommand.scala | 9 ++-
.../schema/CarbonAlterTableDropColumnCommand.scala | 27 ++------
.../spark/sql/hive/CarbonHiveMetaStore.scala | 6 +-
.../spark/sql/hive/CarbonSessionCatalogUtil.scala | 3 +
.../AlterTableColumnRenameEventListener.scala | 3 +-
.../org/apache/spark/util/AlterTableUtil.scala | 62 +++++++++++++++--
.../restructure/AlterTableRevertTestCase.scala | 78 +++++++++++++++++++---
8 files changed, 148 insertions(+), 48 deletions(-)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 6aea625871..4e3b18f316 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -20,8 +20,9 @@ package org.apache.spark.sql.execution.command.schema
import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel,
AlterTableColumnSchemaGenerator, MetadataCommand}
-import org.apache.spark.sql.hive.CarbonSessionCatalogUtil
+import org.apache.spark.sql.hive.{CarbonSessionCatalogUtil,
MockClassForAlterRevertTests}
import org.apache.spark.util.{AlterTableUtil, SparkUtil}
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -114,6 +115,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
}
CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier, cols,
sparkSession)
sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+ new MockClassForAlterRevertTests().mockForAlterAddColRevertTest()
val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent =
AlterTableAddColumnPostEvent(sparkSession, carbonTable,
alterTableAddColumnsModel)
OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent,
operationContext)
@@ -123,6 +125,10 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
if (newCols.nonEmpty) {
LOGGER.info("Cleaning up the dictionary files as alter table add
operation failed")
AlterTableUtil.revertAddColumnChanges(dbName, tableName,
timeStamp)(sparkSession)
+ val tableIdentifier = TableIdentifier(tableName, Some(dbName))
+ // drop new cols which are added in catalog table in case of failure
+ AlterTableUtil.deleteColsAndUpdateSchema(carbonTable,
+ newCols, tableIdentifier, sparkSession)
}
throwMetadataException(dbName, tableName,
s"Alter table add operation failed: ${e.getMessage}")
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
index 01491fa79e..94eded836e 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableColRenameDataTypeChangeCommand.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable
import org.apache.hadoop.hive.metastore.api.InvalidOperationException
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{AlterTableDataTypeChangeModel,
DataTypeInfo, MetadataCommand}
-import org.apache.spark.sql.hive.CarbonSessionCatalogUtil
+import org.apache.spark.sql.hive.{CarbonSessionCatalogUtil,
MockClassForAlterRevertTests}
import org.apache.spark.util.AlterTableUtil
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -312,9 +312,11 @@ private[sql] case class
CarbonAlterTableColRenameDataTypeChangeCommand(
if (isSchemaEntryRequired) {
addedColumnsList ++= List(columnSchema)
deletedColumnsList ++= List(deletedColumnSchema)
+ timeStamp = System.currentTimeMillis()
schemaEvolutionEntry =
AlterTableUtil.addNewSchemaEvolutionEntry(schemaEvolutionEntry,
addedColumnsList,
- deletedColumnsList)
+ deletedColumnsList,
+ timeStamp)
}
}
@@ -330,6 +332,7 @@ private[sql] case class
CarbonAlterTableColRenameDataTypeChangeCommand(
addedTableColumnSchema,
schemaEvolutionEntry,
oldCarbonColumn.head)
+ new MockClassForAlterRevertTests().mockForAlterAddColRevertTest()
val alterTableColRenameAndDataTypeChangePostEvent
: AlterTableColRenameAndDataTypeChangePostEvent =
AlterTableColRenameAndDataTypeChangePostEvent(sparkSession,
carbonTable,
@@ -349,7 +352,7 @@ private[sql] case class
CarbonAlterTableColRenameDataTypeChangeCommand(
if (carbonTable != null) {
if (carbonTable.isTransactionalTable) {
AlterTableUtil
- .revertColumnRenameAndDataTypeChanges(dbName, tableName,
timeStamp)(sparkSession)
+ .revertColumnRenameAndDataTypeChanges(carbonTable,
timeStamp)(sparkSession)
}
}
if (isDataTypeChange) {
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index c4630917d4..024323e0c9 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -22,8 +22,8 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
import org.apache.spark.sql.execution.command.{AlterTableDropColumnModel,
MetadataCommand}
-import org.apache.spark.sql.hive.CarbonSessionCatalogUtil
-import org.apache.spark.util.{AlterTableUtil, SparkUtil}
+import org.apache.spark.sql.hive.MockClassForAlterRevertTests
+import org.apache.spark.util.AlterTableUtil
import
org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -32,7 +32,6 @@ import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import
org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.datatype.DataTypes
-import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.events.{AlterTableDropColumnPostEvent,
AlterTableDropColumnPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.SchemaEvolutionEntry
@@ -177,25 +176,9 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
carbonTable,
schemaEvolutionEntry,
tableInfo)(sparkSession)
- // get the columns in schema order and filter the dropped column in the
column set
- val cols = carbonTable.getCreateOrderColumn.asScala
- .collect { case carbonColumn if !carbonColumn.isInvisible =>
carbonColumn.getColumnSchema }
- .filterNot(column => delCols.contains(column))
- // When we call
- // alterExternalCatalogForTableWithUpdatedSchema to update the new
schema to external catalog
- // in case of drop column, spark gets the catalog table and then it
itself adds the partition
- // columns if the table is partition table for all the new data schema
sent by carbon,
- // so there will be duplicate partition columns, so send the columns
without partition columns
- val columns = if (carbonTable.isHivePartitionTable) {
- val partitionColumns = partitionInfo.getColumnSchemaList.asScala
- val carbonColumnsWithoutPartition = cols.filterNot(col =>
partitionColumns.contains(col))
- Some(carbonColumnsWithoutPartition)
- } else {
- Some(cols)
- }
- CarbonSessionCatalogUtil.alterDropColumns(
- tableIdentifier, columns, sparkSession)
- sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+ AlterTableUtil.deleteColsAndUpdateSchema(carbonTable,
+ delCols, tableIdentifier, sparkSession)
+ new MockClassForAlterRevertTests().mockForAlterAddColRevertTest()
// TODO: 1. add check for deletion of index tables
// event will be fired before dropping the columns
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 1f15f9086f..4c069f4b5f 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -156,11 +156,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
val dbName = newTableIdentifier.getDatabaseName
val tableName = newTableIdentifier.getTableName
val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo,
"=", "'", "")
- val hiveClient = sparkSession
- .sessionState
- .catalog
- .externalCatalog.asInstanceOf[HiveExternalCatalog]
- .client
+ val hiveClient = CarbonSessionCatalogUtil.getClient(sparkSession)
hiveClient.runSqlHive(s"ALTER TABLE `$dbName`.`$tableName` SET
SERDEPROPERTIES($schemaParts)")
sparkSession.catalog.refreshTable(TableIdentifier(tableName,
Some(dbName)).quotedString)
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
index 7205f67ef1..539e996394 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
@@ -159,4 +159,7 @@ object CarbonSessionCatalogUtil {
class MockClassForAlterRevertTests {
def mockForAlterRevertTest(): Unit = {
}
+
+ def mockForAlterAddColRevertTest(): Unit = {
+ }
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableColumnRenameEventListener.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableColumnRenameEventListener.scala
index d673b05d58..8439dbe8b4 100644
---
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableColumnRenameEventListener.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableColumnRenameEventListener.scala
@@ -136,8 +136,7 @@ class AlterTableColumnRenameEventListener extends
OperationEventListener with Lo
val evolutionEntryList = thriftTable.fact_table.schema_evolution
.schema_evolution_history
AlterTableUtil
-
.revertColumnRenameAndDataTypeChanges(indexCarbonTable.getDatabaseName,
- indexCarbonTable.getTableName,
+ .revertColumnRenameAndDataTypeChanges(indexCarbonTable,
evolutionEntryList.get(evolutionEntryList.size() -
1).time_stamp)(
alterTableColRenameAndDataTypeChangePostEvent.sparkSession)
}
diff --git
a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index cfa6e310a8..8100236c55 100644
---
a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++
b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -288,13 +288,21 @@ object AlterTableUtil {
(sparkSession: SparkSession): Unit = {
revertSchema(dbName, tableName, timeStamp, sparkSession) { (thriftTable,
evolutionEntryList) =>
val removedSchemas = evolutionEntryList.get(evolutionEntryList.size() -
1).removed
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ val columnSchemaList = new util.ArrayList[ColumnSchema]
thriftTable.fact_table.table_columns.asScala.foreach { columnSchema =>
removedSchemas.asScala.foreach { removedSchemas =>
if (columnSchema.invisible && removedSchemas.column_id ==
columnSchema.column_id) {
columnSchema.setInvisible(false)
}
}
+
columnSchemaList.add(schemaConverter.fromExternalToWrapperColumnSchema(columnSchema))
}
+ // add back the dropped columns to catalog table in case of failure
+ val tableIdentifier = new TableIdentifier(tableName, Some(dbName))
+ CarbonSessionCatalogUtil.alterAddColumns(tableIdentifier,
+ Some(columnSchemaList.asScala),
+ sparkSession)
}
}
@@ -306,20 +314,33 @@ object AlterTableUtil {
* @param timeStamp
* @param sparkSession
*/
- def revertColumnRenameAndDataTypeChanges(dbName: String, tableName: String,
timeStamp: Long)
+ def revertColumnRenameAndDataTypeChanges(carbonTable: CarbonTable,
timeStamp: Long)
(sparkSession: SparkSession): Unit = {
- revertSchema(dbName, tableName, timeStamp, sparkSession) { (thriftTable,
evolutionEntryList) =>
- val removedColumns = evolutionEntryList.get(evolutionEntryList.size() -
1).removed
+ val tableIdentifier = TableIdentifier(
+ carbonTable.getTableName, Some(carbonTable.getDatabaseName))
+ revertSchema(carbonTable.getDatabaseName,
+ carbonTable.getTableName, timeStamp, sparkSession) { (thriftTable,
evolutionEntryList) =>
+ val removedColumns:
java.util.List[org.apache.carbondata.format.ColumnSchema] =
+ evolutionEntryList.get(evolutionEntryList.size() - 1).removed
thriftTable.fact_table.table_columns.asScala.foreach { columnSchema =>
removedColumns.asScala.foreach { removedColumn =>
if (columnSchema.column_id.equalsIgnoreCase(removedColumn.column_id)
&&
!columnSchema.isInvisible) {
+ columnSchema.setColumn_name(removedColumn.column_name)
columnSchema.setData_type(removedColumn.data_type)
columnSchema.setPrecision(removedColumn.precision)
columnSchema.setScale(removedColumn.scale)
+ val carbonColumns = carbonTable.getCreateOrderColumn().asScala
+ .collect { case carbonColumn if !carbonColumn.isInvisible =>
carbonColumn
+ .getColumnSchema
+ }
+ // add back the original columns before rename to catalog table in
case of failure
+ CarbonSessionCatalogUtil.alterColumnChangeDataTypeOrRename(
+ tableIdentifier, Some(carbonColumns), sparkSession)
}
}
}
+ sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
}
}
@@ -365,8 +386,8 @@ object AlterTableUtil {
def addNewSchemaEvolutionEntry(
schemaEvolutionEntry: SchemaEvolutionEntry,
addedColumnsList: List[org.apache.carbondata.format.ColumnSchema],
- deletedColumnsList: List[org.apache.carbondata.format.ColumnSchema]):
SchemaEvolutionEntry = {
- val timeStamp = System.currentTimeMillis()
+ deletedColumnsList: List[org.apache.carbondata.format.ColumnSchema],
+ timeStamp: Long): SchemaEvolutionEntry = {
val newSchemaEvolutionEntry = if (schemaEvolutionEntry == null) {
new SchemaEvolutionEntry(timeStamp)
} else {
@@ -1191,4 +1212,35 @@ object AlterTableUtil {
}
}
+ /**
+ * Removes the columns specified in params from the catalog table and
refresh catalog table
+ * @param carbonTable for which the columns needs to be updated
+ * @param deleteCols to be removed from schema
+ * @param tableIdentifier for the table which requires schema update
+ * @param sparkSession instance
+ */
+ def deleteColsAndUpdateSchema(carbonTable: CarbonTable, deleteCols:
Seq[ColumnSchema],
+ tableIdentifier: TableIdentifier, sparkSession: SparkSession): Unit = {
+ // get the columns in schema order and filter the dropped column in the
column set
+ val newColsAfterDrop = carbonTable.getCreateOrderColumn.asScala
+ .collect { case carbonColumn if !carbonColumn.isInvisible =>
carbonColumn.getColumnSchema }
+ .filterNot(column => deleteCols.contains(column))
+ // When we call
+ // alterExternalCatalogForTableWithUpdatedSchema to update the new schema
to external catalog
+ // in case of drop column, spark gets the catalog table and then it itself
adds the partition
+ // columns if the table is partition table for all the new data schema
sent by carbon,
+ // so there will be duplicate partition columns, so send the columns
without partition columns
+ val updatedNewColsAfterDrop = if (carbonTable.isHivePartitionTable) {
+ val partitionColumns =
carbonTable.getPartitionInfo.getColumnSchemaList.asScala
+ val carbonColumnsWithoutPartition =
+ newColsAfterDrop.filterNot(col => partitionColumns.contains(col))
+ Some(carbonColumnsWithoutPartition)
+ } else {
+ Some(newColsAfterDrop)
+ }
+ CarbonSessionCatalogUtil.alterDropColumns(tableIdentifier,
+ updatedNewColsAfterDrop, sparkSession)
+ sparkSession.catalog.refreshTable(tableIdentifier.quotedString)
+ }
+
}
diff --git
a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 5d2f510c97..c52ef4c006 100644
---
a/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++
b/integration/spark/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -20,7 +20,7 @@ package org.apache.spark.carbondata.restructure
import java.io.File
import mockit.{Mock, MockUp}
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.hive.MockClassForAlterRevertTests
import org.apache.spark.sql.test.TestQueryExecutor
import org.apache.spark.sql.test.util.QueryTest
@@ -32,13 +32,7 @@ import
org.apache.carbondata.spark.exception.ProcessMetaDataException
class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
override def beforeAll() {
- new MockUp[MockClassForAlterRevertTests]() {
- @Mock
- @throws[ProcessMetaDataException]
- def mockForAlterRevertTest(): Unit = {
- throw new ProcessMetaDataException("default", "reverttest", "thrown in
mock")
- }
- }
+ mock()
sql("drop table if exists reverttest")
sql(
"CREATE TABLE reverttest(intField int,stringField string,timestampField
timestamp," +
@@ -98,14 +92,78 @@ class AlterTableRevertTestCase extends QueryTest with
BeforeAndAfterAll {
}
}
+ test("alter add operations revert testcase") {
+ unMock()
+ try {
+ sql("drop table if exists reverttest")
+ sql(
+ "CREATE TABLE reverttest(intField int,stringField string) using
carbondata")
+ sql("insert into reverttest select 1, 'abc'")
+ sql(
+ "Alter table reverttest add columns(newField1 string) TBLPROPERTIES" +
+ "('DEFAULT.VALUE.newField1'='def')")
+ new MockUp[MockClassForAlterRevertTests]() {
+ @Mock
+ @throws[ProcessMetaDataException]
+ def mockForAlterAddColRevertTest(): Unit = {
+ throw new ProcessMetaDataException("default", "reverttest", "thrown
in mock")
+ }
+ }
+ // check revert schema with alter add column
+ intercept[ProcessMetaDataException] {
+ sql(
+ "Alter table reverttest add columns(newField2 string) TBLPROPERTIES"
+
+ "('DEFAULT.VALUE.newField2'='def')")
+ }
+ checkAnswerAfterAlter()
+ // check revert schema with alter drop column
+ intercept[ProcessMetaDataException] {
+ sql("Alter table reverttest drop columns(newField1)")
+ }
+ checkAnswerAfterAlter()
+ // check revert schema with alter change column
+ intercept[ProcessMetaDataException] {
+ sql("alter table reverttest change newField1 newField2 string")
+ }
+ checkAnswerAfterAlter()
+
+ def checkAnswerAfterAlter(): Unit = {
+ val desCols = sql("desc reverttest").collect()
+ assert(desCols.length == 3)
+ assert(desCols.mkString("Array(", ", ", ")").contains("newfield1"))
+ checkAnswer(sql("select intField,stringField,newField1 from
reverttest"),
+ Seq(Row(1, "abc", "def")))
+ }
+ } finally {
+ mock()
+ }
+ }
+
override def afterAll() {
+ unMock()
+ sql("drop table if exists reverttest")
+ sql("drop table if exists reverttest_fail")
+ }
+
+ private def mock(): Unit = {
new MockUp[MockClassForAlterRevertTests]() {
@Mock
+ @throws[ProcessMetaDataException]
def mockForAlterRevertTest(): Unit = {
+ throw new ProcessMetaDataException("default", "reverttest", "thrown in
mock")
}
}
- sql("drop table if exists reverttest")
- sql("drop table if exists reverttest_fail")
}
+ private def unMock(): Unit = {
+ new MockUp[MockClassForAlterRevertTests]() {
+ @Mock
+ def mockForAlterRevertTest(): Unit = {
+ }
+
+ @Mock
+ def mockForAlterAddColRevertTest(): Unit = {
+ }
+ }
+ }
}