This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 8a4c86b958254b8b06e5cd3269ef00f27908fb7b Author: Sivabalan Narayanan <[email protected]> AuthorDate: Mon Dec 12 11:39:18 2022 -0800 [HUDI-5296] Allow disable schema on read after enabling (#7421) If someone has enabled schema on read by mistake and never really renamed or dropped a column. it should be feasible to disable schema on read. This patch fixes that. essentially both on read and write path, if "hoodie.schema.on.read.enable" config is not set, it will fallback to regular code path. It might fail or users might miss data if any they have performed any irrevocable changes like renames. But for rest, this should work. --- .../scala/org/apache/hudi/HoodieBaseRelation.scala | 20 ++-- .../org/apache/hudi/HoodieSparkSqlWriter.scala | 119 ++++++++++++++------- .../org/apache/hudi/IncrementalRelation.scala | 6 +- .../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 54 +++++++++- 4 files changed, 146 insertions(+), 53 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala index 5e618f822d3..242d7eb8679 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala @@ -136,7 +136,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, */ protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = { val schemaResolver = new TableSchemaResolver(metaClient) - val internalSchemaOpt = if (!isSchemaEvolutionEnabled) { + val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams, sparkSession)) { None } else { Try { @@ -635,15 +635,6 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext, private def prunePartitionColumns(dataStructSchema: StructType): StructType = StructType(dataStructSchema.filterNot(f => partitionColumns.contains(f.name))) - - private def isSchemaEvolutionEnabled = { - // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as - // t/h Spark Session configuration (for ex, for Spark SQL) - optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || - sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, - DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean - } } object HoodieBaseRelation extends SparkAdapterSupport { @@ -745,4 +736,13 @@ object HoodieBaseRelation extends SparkAdapterSupport { }) } } + + def isSchemaEvolutionEnabledOnRead(optParams: Map[String, String], sparkSession: SparkSession): Boolean = { + // NOTE: Schema evolution could be configured both t/h optional parameters vehicle as well as + // t/h Spark Session configuration (for ex, for Spark SQL) + optParams.getOrElse(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean || + sparkSession.conf.get(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key, + DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.defaultValue.toString).toBoolean + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 51cd30f7f23..003dbd35e11 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -21,6 +21,7 @@ import org.apache.avro.Schema import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace} import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption} import org.apache.hudi.HoodieWriterUtils._ @@ -55,6 +56,8 @@ import org.apache.log4j.LogManager import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.{SPARK_VERSION, SparkContext} @@ -84,6 +87,7 @@ object HoodieSparkSqlWriter { assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)), "'path' must be set") val path = optParams("path") val basePath = new Path(path) + val spark = sqlContext.sparkSession val sparkContext = sqlContext.sparkContext val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration) tableExists = fs.exists(new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME)) @@ -98,6 +102,7 @@ object HoodieSparkSqlWriter { val databaseName = hoodieConfig.getStringOrDefault(HoodieTableConfig.DATABASE_NAME, "") val tblName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.").trim + val tableIdentifier = TableIdentifier(tblName, if (databaseName.isEmpty) None else Some(databaseName)) assert(!StringUtils.isNullOrEmpty(hoodieConfig.getString(HoodieWriteConfig.TBL_NAME)), s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.") @@ -139,15 +144,18 @@ object HoodieSparkSqlWriter { // Handle various save modes handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs) val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters)) - // Create the table if not present - if (!tableExists) { + val tableMetaClient = if (tableExists) { + HoodieTableMetaClient.builder + .setConf(sparkContext.hadoopConfiguration) + .setBasePath(path) + .build() + } else { val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT) val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER) val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD) val populateMetaFields = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS) val useBaseFormatMetaFile = hoodieConfig.getBooleanOrDefault(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT); - - val tableMetaClient = HoodieTableMetaClient.withPropertyBuilder() + HoodieTableMetaClient.withPropertyBuilder() .setTableType(tableType) .setDatabaseName(databaseName) .setTableName(tblName) @@ -169,8 +177,8 @@ object HoodieSparkSqlWriter { .setShouldDropPartitionColumns(hoodieConfig.getBooleanOrDefault(HoodieTableConfig.DROP_PARTITION_COLUMNS)) .setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))) .initTable(sparkContext.hadoopConfiguration, path) - tableConfig = tableMetaClient.getTableConfig - } + } + tableConfig = tableMetaClient.getTableConfig val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType) val dropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS) @@ -198,7 +206,7 @@ object HoodieSparkSqlWriter { } // Create a HoodieWriteClient & issue the delete. - val internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) + val internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient) val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, null, path, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key))) @@ -250,10 +258,10 @@ object HoodieSparkSqlWriter { // TODO(HUDI-4472) revisit and simplify schema handling val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) - val latestTableSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(sourceSchema) + val latestTableSchema = getLatestTableSchema(spark, tableIdentifier, tableMetaClient).getOrElse(sourceSchema) val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean - var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) + var internalSchemaOpt = getLatestTableInternalSchema(hoodieConfig, tableMetaClient) val writerSchema: Schema = if (reconcileSchema) { @@ -286,6 +294,34 @@ object HoodieSparkSqlWriter { validateSchemaForHoodieIsDeleted(writerSchema) sparkContext.getConf.registerAvroSchemas(writerSchema) log.info(s"Registered avro schema : ${writerSchema.toString(true)}") +/*======= + case _ => + // Here all other (than DELETE, DELETE_PARTITION) write operations are handled + // + // Convert to RDD[HoodieRecord] + val avroRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, avroRecordName, avroRecordNamespace, + Some(writerSchema)) + + // Check whether partition columns should be persisted w/in the data-files, or should + // be instead omitted from them and simply encoded into the partition path (which is Spark's + // behavior by default) + // TODO move partition columns handling down into the handlers + val shouldDropPartitionColumns = hoodieConfig.getBoolean(DataSourceWriteOptions.DROP_PARTITION_COLUMNS) + val dataFileSchema = if (shouldDropPartitionColumns) { + val truncatedSchema = generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) + // NOTE: We have to register this schema w/ Kryo to make sure it's able to apply an optimization + // allowing it to avoid the need to ser/de the whole schema along _every_ Avro record + registerAvroSchemasWithKryo(sparkContext, truncatedSchema) + truncatedSchema + } else { + writerSchema + } + + // NOTE: Avro's [[Schema]] can't be effectively serialized by JVM native serialization framework + // (due to containing cyclic refs), therefore we have to convert it to string before + // passing onto the Executor + val dataFileSchemaStr = dataFileSchema.toString +>>>>>>> aacfe6de80 ([HUDI-5296] Allow disable schema on read after enabling (#7421))*/ // Convert to RDD[HoodieRecord] val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema, @@ -407,47 +443,48 @@ object HoodieSparkSqlWriter { } /** - * get latest internalSchema from table - * - * @param fs instance of FileSystem. - * @param basePath base path. - * @param sparkContext instance of spark context. - * @param schema incoming record's schema. - * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. - */ - def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[InternalSchema] = { - try { - if (FSUtils.isTableExists(basePath.toString, fs)) { - val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build() + * get latest internalSchema from table + * + * @param config instance of {@link HoodieConfig} + * @param tableMetaClient instance of HoodieTableMetaClient + * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. + */ + def getLatestTableInternalSchema(config: HoodieConfig, + tableMetaClient: HoodieTableMetaClient): Option[InternalSchema] = { + if (!config.getBooleanOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED)) { + Option.empty[InternalSchema] + } else { + try { val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) val internalSchemaOpt = tableSchemaResolver.getTableInternalSchemaFromCommitMetadata if (internalSchemaOpt.isPresent) Some(internalSchemaOpt.get()) else None - } else { - None + } catch { + case _: Exception => None } - } catch { - case _: Exception => None } } - /** - * Checks if schema needs upgrade (if incoming record's write schema is old while table schema got evolved). - * - * @param fs instance of FileSystem. - * @param basePath base path. - * @param sparkContext instance of spark context. - * @param schema incoming record's schema. - * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. - */ - def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[Schema] = { - if (FSUtils.isTableExists(basePath.toString, fs)) { - val tableMetaClient = HoodieTableMetaClient.builder - .setConf(sparkContext.hadoopConfiguration) - .setBasePath(basePath.toString) - .build() - val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) + private def registerAvroSchemasWithKryo(sparkContext: SparkContext, targetAvroSchemas: Schema*): Unit = { + sparkContext.getConf.registerAvroSchemas(targetAvroSchemas: _*) + } + private def getLatestTableSchema(spark: SparkSession, + tableId: TableIdentifier, + tableMetaClient: HoodieTableMetaClient): Option[Schema] = { + val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) + val latestTableSchemaFromCommitMetadata = toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)) + latestTableSchemaFromCommitMetadata.orElse { + getCatalogTable(spark, tableId).map { catalogTable => + val (structName, namespace) = getAvroRecordNameAndNamespace(tableId.table) + convertStructTypeToAvroSchema(catalogTable.schema, structName, namespace) + } + } + } + + private def getCatalogTable(spark: SparkSession, tableId: TableIdentifier): Option[CatalogTable] = { + if (spark.sessionState.catalog.tableExists(tableId)) { + Some(spark.sessionState.catalog.getTableMetadata(tableId)) } else { None } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index 4c763b054ad..80ee3dde5b0 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -19,6 +19,7 @@ package org.apache.hudi import org.apache.avro.Schema import org.apache.hadoop.fs.{GlobPattern, Path} +import org.apache.hudi.HoodieBaseRelation.isSchemaEvolutionEnabledOnRead import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.client.utils.SparkInternalSchemaConverter import org.apache.hudi.common.fs.FSUtils @@ -47,6 +48,7 @@ import scala.collection.mutable * Relation, that implements the Hoodie incremental view. * * Implemented for Copy_on_write storage. + * TODO: rebase w/ HoodieBaseRelation HUDI-5362 * */ class IncrementalRelation(val sqlContext: SQLContext, @@ -90,7 +92,9 @@ class IncrementalRelation(val sqlContext: SQLContext, val (usedSchema, internalSchema) = { log.info("Inferring schema..") val schemaResolver = new TableSchemaResolver(metaClient) - val iSchema = if (useEndInstantSchema && !commitsToReturn.isEmpty) { + val iSchema : InternalSchema = if (!isSchemaEvolutionEnabledOnRead(optParams, sqlContext.sparkSession)) { + InternalSchema.getEmptyInternalSchema + } else if (useEndInstantSchema && !commitsToReturn.isEmpty) { InternalSchemaCache.searchSchemaAndCache(commitsToReturn.last.getTimestamp.toLong, metaClient, hoodieTable.getConfig.getInternalSchemaCacheEnable) } else { schemaResolver.getTableInternalSchemaFromCommitMetadata.orElse(null) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index 9d955cb8310..056bca37fce 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -22,7 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.testutils.HoodieTestDataGenerator import org.apache.hudi.common.testutils.RawTripTestPayload import org.apache.hudi.config.HoodieWriteConfig -import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkUtils} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.functions.{arrays_zip, col} import org.apache.spark.sql.{Row, SaveMode, SparkSession} @@ -171,6 +171,58 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } } + test("Test Enable and Disable Schema on read") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + if (HoodieSparkUtils.gteqSpark3_1) { + spark.sql("set hoodie.schema.on.read.enable=true") + // Create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | type = 'cow', + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + + // Insert data to the new table. + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + checkAnswer(s"select id, name, price, ts from $tableName")( + Seq(1, "a1", 10.0, 1000) + ) + + // add column + spark.sql(s"alter table $tableName add columns(new_col string)") + val catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName)) + assertResult(Seq("id", "name", "price", "ts", "new_col")) { + HoodieSqlCommonUtils.removeMetaFields(catalogTable.schema).fields.map(_.name) + } + checkAnswer(s"select id, name, price, ts, new_col from $tableName")( + Seq(1, "a1", 10.0, 1000, null) + ) + // disable schema on read. + spark.sql("set hoodie.schema.on.read.enable=false") + spark.sql(s"refresh table $tableName") + // Insert data to the new table. + spark.sql(s"insert into $tableName values(2, 'a2', 12, 2000, 'e0')") + // write should succeed. and subsequent read should succeed as well. + checkAnswer(s"select id, name, price, ts, new_col from $tableName")( + Seq(1, "a1", 10.0, 1000, null), + Seq(2, "a2", 12.0, 2000, "e0") + ) + } + } + } + test("Test Partition Table alter ") { withTempDir { tmp => Seq("cow", "mor").foreach { tableType =>
