Repository: spark Updated Branches: refs/heads/branch-2.2 c311c5e79 -> 4074ed2e1
[SPARK-22306][SQL][2.2] alter table schema should not erase the bucketing metadata at hive side ## What changes were proposed in this pull request? When we alter table schema, we set the new schema to spark `CatalogTable`, convert it to hive table, and finally call `hive.alterTable`. This causes a problem in Spark 2.2, because hive bucketing metedata is not recognized by Spark, which means a Spark `CatalogTable` representing a hive table is always non-bucketed, and when we convert it to hive table and call `hive.alterTable`, the original hive bucketing metadata will be removed. To fix this bug, we should read out the raw hive table metadata, update its schema, and call `hive.alterTable`. By doing this we can guarantee only the schema is changed, and nothing else. Note that this bug doesn't exist in the master branch, because we've added hive bucketing support and the hive bucketing metadata can be recognized by Spark. I think we should merge this PR to master too, for code cleanup and reduce the difference between master and 2.2 branch for backporting. ## How was this patch tested? new regression test Author: Wenchen Fan <wenc...@databricks.com> Closes #19622 from cloud-fan/infer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4074ed2e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4074ed2e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4074ed2e Branch: refs/heads/branch-2.2 Commit: 4074ed2e1363c886878bbf9483e21abd1745f482 Parents: c311c5e Author: Wenchen Fan <wenc...@databricks.com> Authored: Thu Nov 2 12:37:52 2017 +0100 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu Nov 2 12:37:52 2017 +0100 ---------------------------------------------------------------------- .../sql/catalyst/catalog/ExternalCatalog.scala | 12 ++--- .../sql/catalyst/catalog/InMemoryCatalog.scala | 7 +-- .../sql/catalyst/catalog/SessionCatalog.scala | 25 ++++----- .../catalyst/catalog/ExternalCatalogSuite.scala | 11 ++-- .../catalyst/catalog/SessionCatalogSuite.scala | 21 ++++++-- .../spark/sql/execution/command/tables.scala | 10 +--- .../spark/sql/hive/HiveExternalCatalog.scala | 57 +++++++++++++------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 11 ++-- .../spark/sql/hive/client/HiveClient.scala | 11 ++++ .../spark/sql/hive/client/HiveClientImpl.scala | 45 ++++++++++------ .../sql/hive/HiveExternalCatalogSuite.scala | 18 +++++++ .../sql/hive/MetastoreDataSourcesSuite.scala | 4 +- .../sql/hive/execution/Hive_2_1_DDLSuite.scala | 2 +- 13 files changed, 148 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala index 18644b0..8db6f79 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala @@ -148,17 +148,15 @@ abstract class ExternalCatalog def alterTable(tableDefinition: CatalogTable): Unit /** - * Alter the schema of a table identified by the provided database and table name. The new schema - * should still contain the existing bucket columns and partition columns used by the table. This - * method will also update any Spark SQL-related parameters stored as Hive table properties (such - * as the schema itself). + * Alter the data schema of a table identified by the provided database and table name. The new + * data schema should not have conflict column names with the existing partition columns, and + * should still contain all the existing data columns. * * @param db Database that table to alter schema for exists in * @param table Name of table to alter schema for - * @param schema Updated schema to be used for the table (must contain existing partition and - * bucket columns) + * @param newDataSchema Updated data schema to be used for the table. */ - def alterTableSchema(db: String, table: String, schema: StructType): Unit + def alterTableDataSchema(db: String, table: String, newDataSchema: StructType): Unit def getTable(db: String, table: String): CatalogTable http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index bf8542c..f83e28f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -301,13 +301,14 @@ class InMemoryCatalog( catalog(db).tables(tableDefinition.identifier.table).table = tableDefinition } - override def alterTableSchema( + override def alterTableDataSchema( db: String, table: String, - schema: StructType): Unit = synchronized { + newDataSchema: StructType): Unit = synchronized { requireTableExists(db, table) val origTable = catalog(db).tables(table).table - catalog(db).tables(table).table = origTable.copy(schema = schema) + val newSchema = StructType(newDataSchema ++ origTable.partitionSchema) + catalog(db).tables(table).table = origTable.copy(schema = newSchema) } override def getTable(db: String, table: String): CatalogTable = synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index df8f9aa..bbcfdac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -336,30 +336,28 @@ class SessionCatalog( } /** - * Alter the schema of a table identified by the provided table identifier. The new schema - * should still contain the existing bucket columns and partition columns used by the table. This - * method will also update any Spark SQL-related parameters stored as Hive table properties (such - * as the schema itself). + * Alter the data schema of a table identified by the provided table identifier. The new data + * schema should not have conflict column names with the existing partition columns, and should + * still contain all the existing data columns. * * @param identifier TableIdentifier - * @param newSchema Updated schema to be used for the table (must contain existing partition and - * bucket columns, and partition columns need to be at the end) + * @param newDataSchema Updated data schema to be used for the table */ - def alterTableSchema( + def alterTableDataSchema( identifier: TableIdentifier, - newSchema: StructType): Unit = { + newDataSchema: StructType): Unit = { val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase)) val table = formatTableName(identifier.table) val tableIdentifier = TableIdentifier(table, Some(db)) requireDbExists(db) requireTableExists(tableIdentifier) - checkDuplication(newSchema) val catalogTable = externalCatalog.getTable(db, table) - val oldSchema = catalogTable.schema - + checkDuplication(newDataSchema ++ catalogTable.partitionSchema) + val oldDataSchema = catalogTable.dataSchema // not supporting dropping columns yet - val nonExistentColumnNames = oldSchema.map(_.name).filterNot(columnNameResolved(newSchema, _)) + val nonExistentColumnNames = + oldDataSchema.map(_.name).filterNot(columnNameResolved(newDataSchema, _)) if (nonExistentColumnNames.nonEmpty) { throw new AnalysisException( s""" @@ -368,8 +366,7 @@ class SessionCatalog( """.stripMargin) } - // assuming the newSchema has all partition columns at the end as required - externalCatalog.alterTableSchema(db, table, newSchema) + externalCatalog.alterTableDataSchema(db, table, newDataSchema) } private def columnNameResolved(schema: StructType, colName: String): Boolean = { http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 54ecf44..014d0c0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -245,15 +245,12 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac test("alter table schema") { val catalog = newBasicCatalog() - val tbl1 = catalog.getTable("db2", "tbl1") - val newSchema = StructType(Seq( + val newDataSchema = StructType(Seq( StructField("new_field_1", IntegerType), - StructField("new_field_2", StringType), - StructField("a", IntegerType), - StructField("b", StringType))) - catalog.alterTableSchema("db2", "tbl1", newSchema) + StructField("new_field_2", StringType))) + catalog.alterTableDataSchema("db2", "tbl1", newDataSchema) val newTbl1 = catalog.getTable("db2", "tbl1") - assert(newTbl1.schema == newSchema) + assert(newTbl1.dataSchema == newDataSchema) } test("get table") { http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 9c1b638..a8c6c06 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -452,9 +452,9 @@ abstract class SessionCatalogSuite extends PlanTest { withBasicCatalog { sessionCatalog => sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") - sessionCatalog.alterTableSchema( + sessionCatalog.alterTableDataSchema( TableIdentifier("t1", Some("default")), - StructType(oldTab.dataSchema.add("c3", IntegerType) ++ oldTab.partitionSchema)) + StructType(oldTab.dataSchema.add("c3", IntegerType))) val newTab = sessionCatalog.externalCatalog.getTable("default", "t1") // construct the expected table schema @@ -464,13 +464,26 @@ abstract class SessionCatalogSuite extends PlanTest { } } + test("alter table add columns which are conflicting with partition columns") { + withBasicCatalog { sessionCatalog => + sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) + val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") + val e = intercept[AnalysisException] { + sessionCatalog.alterTableDataSchema( + TableIdentifier("t1", Some("default")), + StructType(oldTab.dataSchema.add("a", IntegerType))) + }.getMessage + assert(e.contains("Found duplicate column(s): a")) + } + } + test("alter table drop columns") { withBasicCatalog { sessionCatalog => sessionCatalog.createTable(newTable("t1", "default"), ignoreIfExists = false) val oldTab = sessionCatalog.externalCatalog.getTable("default", "t1") val e = intercept[AnalysisException] { - sessionCatalog.alterTableSchema( - TableIdentifier("t1", Some("default")), StructType(oldTab.schema.drop(1))) + sessionCatalog.alterTableDataSchema( + TableIdentifier("t1", Some("default")), StructType(oldTab.dataSchema.drop(1))) }.getMessage assert(e.contains("We don't support dropping columns yet.")) } http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 6348638..8b61240 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -187,11 +187,10 @@ case class AlterTableRenameCommand( */ case class AlterTableAddColumnsCommand( table: TableIdentifier, - columns: Seq[StructField]) extends RunnableCommand { + colsToAdd: Seq[StructField]) extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val catalogTable = verifyAlterTableAddColumn(catalog, table) - try { sparkSession.catalog.uncacheTable(table.quotedString) } catch { @@ -199,12 +198,7 @@ case class AlterTableAddColumnsCommand( log.warn(s"Exception when attempting to uncache table ${table.quotedString}", e) } catalog.refreshTable(table) - - // make sure any partition columns are at the end of the fields - val reorderedSchema = catalogTable.dataSchema ++ columns ++ catalogTable.partitionSchema - catalog.alterTableSchema( - table, catalogTable.schema.copy(fields = reorderedSchema.toArray)) - + catalog.alterTableDataSchema(table, StructType(catalogTable.dataSchema ++ colsToAdd)) Seq.empty[Row] } http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 2ea4e15..1c26d98 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -138,16 +138,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } /** - * Checks the validity of column names. Hive metastore disallows the table to use comma in + * Checks the validity of data column names. Hive metastore disallows the table to use comma in * data column names. Partition columns do not have such a restriction. Views do not have such * a restriction. */ - private def verifyColumnNames(table: CatalogTable): Unit = { - if (table.tableType != VIEW) { - table.dataSchema.map(_.name).foreach { colName => + private def verifyDataSchema( + tableName: TableIdentifier, tableType: CatalogTableType, dataSchema: StructType): Unit = { + if (tableType != VIEW) { + dataSchema.map(_.name).foreach { colName => if (colName.contains(",")) { throw new AnalysisException("Cannot create a table having a column whose name contains " + - s"commas in Hive metastore. Table: ${table.identifier}; Column: $colName") + s"commas in Hive metastore. Table: $tableName; Column: $colName") } } } @@ -218,7 +219,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val table = tableDefinition.identifier.table requireDbExists(db) verifyTableProperties(tableDefinition) - verifyColumnNames(tableDefinition) + verifyDataSchema( + tableDefinition.identifier, tableDefinition.tableType, tableDefinition.dataSchema) if (tableExists(db, table) && !ignoreIfExists) { throw new TableAlreadyExistsException(db = db, table = table) @@ -295,7 +297,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat storage = table.storage.copy( locationUri = None, properties = storagePropsWithLocation), - schema = table.partitionSchema, + schema = StructType(EMPTY_DATA_SCHEMA ++ table.partitionSchema), bucketSpec = None, properties = table.properties ++ tableProperties) } @@ -312,6 +314,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat None } + // TODO: empty data schema is not hive compatible, we only do it to keep behavior as it was + // because previously we generate the special empty schema in `HiveClient`. Remove this in + // Spark 2.3. + val schema = if (table.dataSchema.isEmpty) { + StructType(EMPTY_DATA_SCHEMA ++ table.partitionSchema) + } else { + table.schema + } + table.copy( storage = table.storage.copy( locationUri = location, @@ -320,6 +331,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat serde = serde.serde, properties = storagePropsWithLocation ), + schema = schema, properties = table.properties ++ tableProperties) } @@ -630,32 +642,32 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } } - override def alterTableSchema(db: String, table: String, schema: StructType): Unit = withClient { + override def alterTableDataSchema( + db: String, table: String, newDataSchema: StructType): Unit = withClient { requireTableExists(db, table) - val rawTable = getRawTable(db, table) - // Add table metadata such as table schema, partition columns, etc. to table properties. - val updatedProperties = rawTable.properties ++ tableMetaToTableProps(rawTable, schema) - val withNewSchema = rawTable.copy(properties = updatedProperties, schema = schema) - verifyColumnNames(withNewSchema) + val oldTable = getTable(db, table) + verifyDataSchema(oldTable.identifier, oldTable.tableType, newDataSchema) + val schemaProps = + tableMetaToTableProps(oldTable, StructType(newDataSchema ++ oldTable.partitionSchema)).toMap - if (isDatasourceTable(rawTable)) { + if (isDatasourceTable(oldTable)) { // For data source tables, first try to write it with the schema set; if that does not work, // try again with updated properties and the partition schema. This is a simplified version of // what createDataSourceTable() does, and may leave the table in a state unreadable by Hive // (for example, the schema does not match the data source schema, or does not match the // storage descriptor). try { - client.alterTable(withNewSchema) + client.alterTableDataSchema(db, table, newDataSchema, schemaProps) } catch { case NonFatal(e) => val warningMessage = - s"Could not alter schema of table ${rawTable.identifier.quotedString} in a Hive " + + s"Could not alter schema of table ${oldTable.identifier.quotedString} in a Hive " + "compatible way. Updating Hive metastore in Spark SQL specific format." logWarning(warningMessage, e) - client.alterTable(withNewSchema.copy(schema = rawTable.partitionSchema)) + client.alterTableDataSchema(db, table, EMPTY_DATA_SCHEMA, schemaProps) } } else { - client.alterTable(withNewSchema) + client.alterTableDataSchema(db, table, newDataSchema, schemaProps) } } @@ -1191,6 +1203,15 @@ object HiveExternalCatalog { val TABLE_PARTITION_PROVIDER_CATALOG = "catalog" val TABLE_PARTITION_PROVIDER_FILESYSTEM = "filesystem" + // When storing data source tables in hive metastore, we need to set data schema to empty if the + // schema is hive-incompatible. However we need a hack to preserve existing behavior. Before + // Spark 2.0, we do not set a default serde here (this was done in Hive), and so if the user + // provides an empty schema Hive would automatically populate the schema with a single field + // "col". However, after SPARK-14388, we set the default serde to LazySimpleSerde so this + // implicit behavior no longer happens. Therefore, we need to do it in Spark ourselves. + val EMPTY_DATA_SCHEMA = new StructType() + .add("col", "array<string>", nullable = true, comment = "from deserializer") + /** * Returns the fully qualified name used in table properties for a particular column stat. * For example, for column "mycol", and "min" stat, this should return http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f23b27c..f858dd9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -246,11 +246,11 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log inferredSchema match { case Some(dataSchema) => - val schema = StructType(dataSchema ++ relation.tableMeta.partitionSchema) if (inferenceMode == INFER_AND_SAVE) { - updateCatalogSchema(relation.tableMeta.identifier, schema) + updateDataSchema(relation.tableMeta.identifier, dataSchema) } - relation.tableMeta.copy(schema = schema) + val newSchema = StructType(dataSchema ++ relation.tableMeta.partitionSchema) + relation.tableMeta.copy(schema = newSchema) case None => logWarning(s"Unable to infer schema for table $tableName from file format " + s"$fileFormat (inference mode: $inferenceMode). Using metastore schema.") @@ -261,10 +261,9 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log } } - private def updateCatalogSchema(identifier: TableIdentifier, schema: StructType): Unit = try { - val db = identifier.database.get + private def updateDataSchema(identifier: TableIdentifier, newDataSchema: StructType): Unit = try { logInfo(s"Saving case-sensitive schema for table ${identifier.unquotedString}") - sparkSession.sharedState.externalCatalog.alterTableSchema(db, identifier.table, schema) + sparkSession.sessionState.catalog.alterTableDataSchema(identifier, newDataSchema) } catch { case NonFatal(ex) => logWarning(s"Unable to save case-sensitive schema for table ${identifier.unquotedString}", ex) http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index 16a80f9..492a2ea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.types.StructType /** @@ -89,6 +90,16 @@ private[hive] trait HiveClient { /** Updates the given table with new metadata, optionally renaming the table. */ def alterTable(tableName: String, table: CatalogTable): Unit + /** + * Updates the given table with a new data schema and table properties, and keep everything else + * unchanged. + * + * TODO(cloud-fan): it's a little hacky to introduce the schema table properties here in + * `HiveClient`, but we don't have a cleaner solution now. + */ + def alterTableDataSchema( + dbName: String, tableName: String, newDataSchema: StructType, schemaProps: Map[String, String]) + /** Creates a new database with the given name. */ def createDatabase(database: CatalogDatabase, ignoreIfExists: Boolean): Unit http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 2cf11f4..541797d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -46,8 +46,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} import org.apache.spark.sql.hive.client.HiveClientImpl._ import org.apache.spark.sql.types._ import org.apache.spark.util.{CircularBuffer, Utils} @@ -462,6 +461,33 @@ private[hive] class HiveClientImpl( shim.alterTable(client, qualifiedTableName, hiveTable) } + override def alterTableDataSchema( + dbName: String, + tableName: String, + newDataSchema: StructType, + schemaProps: Map[String, String]): Unit = withHiveState { + val oldTable = client.getTable(dbName, tableName) + val hiveCols = newDataSchema.map(toHiveColumn) + oldTable.setFields(hiveCols.asJava) + + // remove old schema table properties + val it = oldTable.getParameters.entrySet.iterator + while (it.hasNext) { + val entry = it.next() + val isSchemaProp = entry.getKey.startsWith(DATASOURCE_SCHEMA_PART_PREFIX) || + entry.getKey == DATASOURCE_SCHEMA || entry.getKey == DATASOURCE_SCHEMA_NUMPARTS + if (isSchemaProp) { + it.remove() + } + } + + // set new schema table properties + schemaProps.foreach { case (k, v) => oldTable.setProperty(k, v) } + + val qualifiedTableName = s"$dbName.$tableName" + shim.alterTable(client, qualifiedTableName, oldTable) + } + override def createPartitions( db: String, table: String, @@ -837,20 +863,7 @@ private[hive] object HiveClientImpl { val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => table.partitionColumnNames.contains(c.getName) } - // after SPARK-19279, it is not allowed to create a hive table with an empty schema, - // so here we should not add a default col schema - if (schema.isEmpty && HiveExternalCatalog.isDatasourceTable(table)) { - // This is a hack to preserve existing behavior. Before Spark 2.0, we do not - // set a default serde here (this was done in Hive), and so if the user provides - // an empty schema Hive would automatically populate the schema with a single - // field "col". However, after SPARK-14388, we set the default serde to - // LazySimpleSerde so this implicit behavior no longer happens. Therefore, - // we need to do it in Spark ourselves. - hiveTable.setFields( - Seq(new FieldSchema("col", "array<string>", "from deserializer")).asJava) - } else { - hiveTable.setFields(schema.asJava) - } + hiveTable.setFields(schema.asJava) hiveTable.setPartCols(partCols.asJava) userName.foreach(hiveTable.setOwner) hiveTable.setCreateTime((table.createTime / 1000).toInt) http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index d43534d..2e35fde 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -89,4 +89,22 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { assert(restoredTable.schema == newSchema) } } + + test("SPARK-22306: alter table schema should not erase the bucketing metadata at hive side") { + val catalog = newBasicCatalog() + externalCatalog.client.runSqlHive( + """ + |CREATE TABLE db1.t(a string, b string) + |CLUSTERED BY (a, b) SORTED BY (a, b) INTO 10 BUCKETS + |STORED AS PARQUET + """.stripMargin) + + val newSchema = new StructType().add("a", "string").add("b", "string").add("c", "string") + catalog.alterTableDataSchema("db1", "t", newSchema) + + assert(catalog.getTable("db1", "t").schema == newSchema) + val bucketString = externalCatalog.client.runSqlHive("DESC FORMATTED db1.t") + .filter(_.contains("Num Buckets")).head + assert(bucketString.contains("10")) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 32e97eb..c0acffb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -746,7 +746,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val hiveTable = CatalogTable( identifier = TableIdentifier(tableName, Some("default")), tableType = CatalogTableType.MANAGED, - schema = new StructType, + schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA, provider = Some("json"), storage = CatalogStorageFormat( locationUri = None, @@ -1271,7 +1271,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val hiveTable = CatalogTable( identifier = TableIdentifier("t", Some("default")), tableType = CatalogTableType.MANAGED, - schema = new StructType, + schema = HiveExternalCatalog.EMPTY_DATA_SCHEMA, provider = Some("json"), storage = CatalogStorageFormat.empty, properties = Map( http://git-wip-us.apache.org/repos/asf/spark/blob/4074ed2e/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala index 5c248b9..bc82887 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/Hive_2_1_DDLSuite.scala @@ -117,7 +117,7 @@ class Hive_2_1_DDLSuite extends SparkFunSuite with TestHiveSingleton with Before spark.sql(createTableStmt) val oldTable = spark.sessionState.catalog.externalCatalog.getTable("default", tableName) catalog.createTable(oldTable, true) - catalog.alterTableSchema("default", tableName, updatedSchema) + catalog.alterTableDataSchema("default", tableName, updatedSchema) val updatedTable = catalog.getTable("default", tableName) assert(updatedTable.schema.fieldNames === updatedSchema.fieldNames) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org