This is an automated email from the ASF dual-hosted git repository. yuzhaojing pushed a commit to branch release-0.12.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 739da48f1c19ac088b5b1c06efdaf28ad36275e9 Author: Y Ethan Guo <[email protected]> AuthorDate: Tue Sep 27 20:00:59 2022 -0700 [HUDI-4453] Fix schema to include partition columns in bootstrap operation (#6676) Turn off the type inference of the partition column to be consistent with existing behavior. Add notes around partition column type inference. --- .../HoodieSparkBootstrapSchemaProvider.java | 33 +++++++++------------- .../org/apache/hudi/HoodieBootstrapRelation.scala | 2 +- 2 files changed, 14 insertions(+), 21 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java index e2a9e68372..b161182b83 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java @@ -18,15 +18,14 @@ package org.apache.hudi.client.bootstrap; -import org.apache.hadoop.conf.Configuration; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieFileStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.AvroOrcUtils; -import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -36,8 +35,6 @@ import org.apache.hadoop.fs.Path; import org.apache.orc.OrcFile; import org.apache.orc.Reader; import org.apache.orc.TypeDescription; -import org.apache.parquet.schema.MessageType; -import org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.StructType; @@ -72,26 +69,22 @@ public class HoodieSparkBootstrapSchemaProvider extends HoodieBootstrapSchemaPro } private static Schema getBootstrapSourceSchemaParquet(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) { - Configuration hadoopConf = context.getHadoopConf().get(); - MessageType parquetSchema = new ParquetUtils().readSchema(hadoopConf, filePath); - - hadoopConf.set( - SQLConf.PARQUET_BINARY_AS_STRING().key(), - SQLConf.PARQUET_BINARY_AS_STRING().defaultValueString()); - hadoopConf.set( - SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), - SQLConf.PARQUET_INT96_AS_TIMESTAMP().defaultValueString()); - hadoopConf.set( - SQLConf.CASE_SENSITIVE().key(), - SQLConf.CASE_SENSITIVE().defaultValueString()); - ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(hadoopConf); - - StructType sparkSchema = converter.convert(parquetSchema); + // NOTE: The type inference of partition column in the parquet table is turned off explicitly, + // to be consistent with the existing bootstrap behavior, where the partition column is String + // typed in Hudi table. + // TODO(HUDI-4932): add a config to allow type inference of partition column in bootstrap and + // support other types of partition column as well + ((HoodieSparkEngineContext) context).getSqlContext() + .setConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE(), false); + StructType parquetSchema = ((HoodieSparkEngineContext) context).getSqlContext().read() + .option("basePath", writeConfig.getBootstrapSourceBasePath()) + .parquet(filePath.toString()) + .schema(); String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName()); String structName = tableName + "_record"; String recordNamespace = "hoodie." + tableName; - return AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, structName, recordNamespace); + return AvroConversionUtils.convertStructTypeToAvroSchema(parquetSchema, structName, recordNamespace); } private static Schema getBootstrapSourceSchemaOrc(HoodieWriteConfig writeConfig, HoodieEngineContext context, Path filePath) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala index 4ec7f65913..0dd54237ef 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBootstrapRelation.scala @@ -146,7 +146,7 @@ class HoodieBootstrapRelation(@transient val _sqlContext: SQLContext, if (fullSchema == null) { logInfo("Inferring schema..") val schemaResolver = new TableSchemaResolver(metaClient) - val tableSchema = TableSchemaResolver.appendPartitionColumns(schemaResolver.getTableAvroSchemaWithoutMetadataFields, metaClient.getTableConfig.getPartitionFields) + val tableSchema = schemaResolver.getTableAvroSchema(false) dataSchema = AvroConversionUtils.convertAvroSchemaToStructType(tableSchema) fullSchema = StructType(skeletonSchema.fields ++ dataSchema.fields) }
