This is an automated email from the ASF dual-hosted git repository.
timbrown pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fec154a8db12 feat(schema): Migrate spark schema conversion utils to
their HoodieSchema equivalent (#17765)
fec154a8db12 is described below
commit fec154a8db1256d3b6a15301adf0f25e4c4a6ff5
Author: Tim Brown <[email protected]>
AuthorDate: Thu Jan 1 19:57:28 2026 -0500
feat(schema): Migrate spark schema conversion utils to their HoodieSchema
equivalent (#17765)
* migrate schema conversion to use HoodieSchema util
* delete class
* update spark4, update error handling
* fix integ test
---
.../MultipleSparkJobExecutionStrategy.java | 4 +-
.../client/common/SparkReaderContextFactory.java | 4 +-
.../client/utils/SparkMetadataWriterUtils.java | 4 +-
.../hudi/client/utils/SparkPartitionUtils.java | 4 +-
.../hudi/client/utils/SparkValidatorUtils.java | 6 +-
.../hudi/common/model/HoodieSparkRecord.java | 3 +-
.../apache/hudi/merge/SparkRecordMergingUtils.java | 6 +-
.../org/apache/hudi/AvroConversionUtils.scala | 191 +--------------------
.../apache/hudi/HoodieSchemaConversionUtils.scala | 27 ++-
.../util/OrderingValueEngineTypeConverter.java | 4 +-
.../sql/avro/HoodieSparkAvroSchemaConverters.scala | 40 -----
.../org/apache/spark/sql/hudi/SparkAdapter.scala | 5 +-
.../hudi/integ/testsuite/utils/SparkSqlUtils.scala | 9 +-
.../apache/hudi/TestDefaultSparkRecordMerger.java | 12 +-
.../org/apache/hudi/TestAvroConversionUtils.scala | 112 +-----------
.../org/apache/hudi/TestHoodieSparkSqlWriter.scala | 2 +-
.../TestHoodieSparkSqlWriterWithTestFormat.scala | 2 +-
.../model/TestHoodieRecordSerialization.scala | 8 +-
.../read/TestHoodieFileGroupReaderOnSpark.scala | 8 +-
.../hudi/functional/ColumnStatIndexTestBase.scala | 2 +-
.../functional/PartitionStatsIndexTestBase.scala | 5 +-
.../hudi/functional/TestBasicSchemaEvolution.scala | 6 +-
.../apache/hudi/functional/TestCOWDataSource.scala | 6 +-
.../functional/TestColumnStatsIndexWithSQL.scala | 4 +-
.../hudi/functional/TestPartitionStatsIndex.scala | 7 +-
.../util/TestSparkInternalSchemaConverter.scala | 4 +-
.../execution/benchmark/AvroSerDerBenchmark.scala | 4 +-
.../spark/sql/adapter/BaseSpark3Adapter.scala | 10 +-
.../spark/sql/adapter/BaseSpark4Adapter.scala | 10 +-
.../utilities/HoodieMetadataTableValidator.java | 6 +-
.../hudi/utilities/schema/HiveSchemaProvider.java | 15 +-
.../utilities/schema/RowBasedSchemaProvider.java | 4 +-
.../helpers/TestCloudObjectsSelectorCommon.java | 10 +-
.../TestMercifulJsonToRowConverterBase.java | 4 +-
34 files changed, 121 insertions(+), 427 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index f378ef46a5a3..be6d516a303f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -18,7 +18,7 @@
package org.apache.hudi.client.clustering.run.strategy;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
@@ -306,7 +306,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
// broadcast reader context.
HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
ReaderContextFactory<InternalRow> readerContextFactory =
getEngineContext().getReaderContextFactory(metaClient);
- StructType sparkSchemaWithMetaFields =
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields.toAvroSchema());
+ StructType sparkSchemaWithMetaFields =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(tableSchemaWithMetaFields);
RDD<InternalRow> internalRowRDD = jsc.parallelize(clusteringOps,
clusteringOps.size()).flatMap(new FlatMapFunction<ClusteringOperation,
InternalRow>() {
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
index a4e88b06915f..4a2f52707121 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/common/SparkReaderContextFactory.java
@@ -18,7 +18,7 @@
package org.apache.hudi.client.common;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.SparkFileFormatInternalRowReaderContext;
@@ -146,7 +146,7 @@ public class SparkReaderContextFactory implements
ReaderContextFactory<InternalR
Configuration
configs,
SparkAdapter
sparkAdapter) {
try {
- StructType dataSchema =
AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema());
+ StructType dataSchema =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(resolver.getTableSchema());
return sparkAdapter.createOrcFileReader(false, sqlConf, options,
configs, dataSchema);
} catch (Exception e) {
throw new HoodieException("Failed to broadcast ORC file reader", e);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index b86fa9612d1e..11da57ce3f02 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -19,7 +19,7 @@
package org.apache.hudi.client.utils;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkRowSerDe;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
@@ -295,7 +295,7 @@ public class SparkMetadataWriterUtils {
getExpressionIndexRecordsIterator(readerContextFactory.getContext(),
metaClient, tableSchema, readerSchema, dataWriteConfig, entry));
// Generate dataset with expression index metadata
- StructType structType =
AvroConversionUtils.convertAvroSchemaToStructType(readerSchema.toAvroSchema())
+ StructType structType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(readerSchema)
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_PARTITION,
DataTypes.StringType, false, Metadata.empty()))
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_RELATIVE_FILE_PATH,
DataTypes.StringType, false, Metadata.empty()))
.add(StructField.apply(HoodieExpressionIndex.HOODIE_EXPRESSION_INDEX_FILE_SIZE,
DataTypes.LongType, false, Metadata.empty()));
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
index 8cf96101bf1a..d3f7f9908324 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkPartitionUtils.java
@@ -18,7 +18,7 @@
package org.apache.hudi.client.utils;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
@@ -41,7 +41,7 @@ public class SparkPartitionUtils {
partitionFields.get(),
partitionPath,
new StoragePath(basePath),
-
AvroConversionUtils.convertAvroSchemaToStructType(writerSchema.toAvroSchema()),
+
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(writerSchema),
hadoopConf.get("timeZone", SQLConf.get().sessionLocalTimeZone()),
hadoopConf.getBoolean("spark.sql.sources.validatePartitionColumns",
true));
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
index f3de90d587be..76fd622ec7fc 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
@@ -18,7 +18,7 @@
package org.apache.hudi.client.utils;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
@@ -137,8 +137,8 @@ public class SparkValidatorUtils {
try {
return sqlContext.createDataFrame(
sqlContext.emptyDataFrame().rdd(),
- AvroConversionUtils.convertAvroSchemaToStructType(
- new
TableSchemaResolver(table.getMetaClient()).getTableAvroSchema()));
+ HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(
+ new
TableSchemaResolver(table.getMetaClient()).getTableSchema()));
} catch (Exception e) {
LOG.warn("Cannot get table schema from before state.", e);
LOG.warn("Using the schema from after state (current transaction) to
create the empty Spark dataframe: {}", newStructTypeSchema);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index 6747e1512a38..64d48f2ca8a0 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.model;
import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.SparkFileFormatInternalRecordContext;
import org.apache.hudi.client.model.HoodieInternalRow;
@@ -323,7 +324,7 @@ public class HoodieSparkRecord extends
HoodieRecord<InternalRow> {
if (data == null) {
return Option.empty();
}
- StructType structType = schema == null ?
AvroConversionUtils.convertAvroSchemaToStructType(recordSchema) : schema;
+ StructType structType = schema == null ?
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(HoodieSchema.fromAvroSchema(recordSchema))
: schema;
GenericRecord convertedRecord =
AvroConversionUtils.createInternalRowToAvroConverter(structType, recordSchema,
false).apply(data);
return Option.of(new HoodieAvroIndexedRecord(key, convertedRecord));
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
index a838e1378d63..29408c131dfa 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
@@ -19,7 +19,7 @@
package org.apache.hudi.merge;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieSparkRecord;
@@ -218,8 +218,8 @@ public class SparkRecordMergingUtils {
}
}
StructType mergedStructType = new
StructType(mergedFieldList.toArray(new StructField[0]));
- HoodieSchema mergedSchema =
HoodieSchemaCache.intern(HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(
- mergedStructType, readerSchema.getName(),
readerSchema.getNamespace().orElse(null))));
+ HoodieSchema mergedSchema =
HoodieSchemaCache.intern(HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
+ mergedStructType, readerSchema.getName(),
readerSchema.getNamespace().orElse(null)));
return Pair.of(mergedMapping, Pair.of(mergedStructType,
mergedSchema));
});
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
index 94b06f551a74..26b1dbf584dd 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala
@@ -20,28 +20,22 @@ package org.apache.hudi
import org.apache.hudi.HoodieSparkUtils.{getCatalystRowSerDe, sparkAdapter}
import org.apache.hudi.avro.AvroSchemaUtils
-import org.apache.hudi.avro.HoodieAvroUtils.createNewSchemaField
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.hudi.internal.schema.HoodieSchemaException
-import org.apache.avro.{AvroRuntimeException, JsonProperties, Schema}
-import org.apache.avro.Schema.Type
+import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Row, SparkSession}
-import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
+import org.apache.spark.sql.types.StructType
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConverters._
-
object AvroConversionUtils {
private val ROW_TO_AVRO_CONVERTER_CACHE =
new ConcurrentHashMap[Tuple3[StructType, Schema, Boolean],
Function1[InternalRow, GenericRecord]]
- private val AVRO_SCHEMA_CACHE = new ConcurrentHashMap[Schema, StructType]
/**
* Creates converter to transform Avro payload into Spark's Catalyst one
@@ -102,10 +96,10 @@ object AvroConversionUtils {
structName: String,
recordNamespace: String): Row => GenericRecord = {
val serde = getCatalystRowSerDe(sourceSqlType)
- val avroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(sourceSqlType, structName,
recordNamespace)
- val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(avroSchema) !=
avroSchema
+ val schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(sourceSqlType,
structName, recordNamespace)
+ val nullable = schema.isNullable
- val converter =
AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType, avroSchema,
nullable)
+ val converter =
AvroConversionUtils.createInternalRowToAvroConverter(sourceSqlType,
schema.toAvroSchema, nullable)
row => converter.apply(serde.serializeRow(row))
}
@@ -119,77 +113,12 @@ object AvroConversionUtils {
ss.createDataFrame(rdd.mapPartitions { records =>
if (records.isEmpty) Iterator.empty
else {
- val schema = new Schema.Parser().parse(schemaStr)
- val dataType = convertAvroSchemaToStructType(schema)
- val converter = createConverterToRow(schema, dataType)
+ val schema = HoodieSchema.parse(schemaStr)
+ val dataType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
+ val converter = createConverterToRow(schema.toAvroSchema, dataType)
records.map { r => converter(r) }
}
- }, convertAvroSchemaToStructType(new Schema.Parser().parse(schemaStr)))
- }
-
- /**
- * Converts [[StructType]] into Avro's [[Schema]]
- *
- * @param structType Catalyst's [[StructType]]
- * @param qualifiedName Avro's schema qualified name
- * @return Avro schema corresponding to given struct type.
- */
- def convertStructTypeToAvroSchema(structType: DataType,
- qualifiedName: String): Schema = {
- val (namespace, name) = {
- val parts = qualifiedName.split('.')
- (parts.init.mkString("."), parts.last)
- }
- convertStructTypeToAvroSchema(structType, name, namespace)
- }
-
-
- /**
- * Converts [[StructType]] into Avro's [[Schema]]
- *
- * @param structType Catalyst's [[StructType]]
- * @param structName Avro record name
- * @param recordNamespace Avro record namespace
- * @return Avro schema corresponding to given struct type.
- */
- def convertStructTypeToAvroSchema(structType: DataType,
- structName: String,
- recordNamespace: String): Schema = {
- try {
- HoodieSparkAvroSchemaConverters.toAvroType(structType, nullable = false,
structName, recordNamespace)
- } catch {
- case a: AvroRuntimeException => throw new
HoodieSchemaException(a.getMessage, a)
- case e: Exception => throw new HoodieSchemaException("Failed to convert
struct type to avro schema: " + structType, e)
- }
- }
-
- /**
- * Converts Avro's [[Schema]] to Catalyst's [[StructType]]
- */
- def convertAvroSchemaToStructType(avroSchema: Schema): StructType = {
- val loader: java.util.function.Function[Schema, StructType] = key => {
- try {
- HoodieSparkAvroSchemaConverters.toSqlType(key) match {
- case (dataType, _) => dataType.asInstanceOf[StructType]
- }
- } catch {
- case e: Exception => throw new HoodieSchemaException("Failed to
convert avro schema to struct type: " + avroSchema, e)
- }
- }
- AVRO_SCHEMA_CACHE.computeIfAbsent(avroSchema, loader)
- }
-
- /**
- * Converts Avro's [[Schema]] to Catalyst's [[DataType]]
- */
- def convertAvroSchemaToDataType(avroSchema: Schema): DataType = {
- try {
- HoodieSparkAvroSchemaConverters.toSqlType(avroSchema) match {
- case (dataType, _) => dataType
- }
- } catch {
- case e: Exception => throw new HoodieSchemaException("Failed to convert
avro schema to DataType: " + avroSchema, e)
- }
+ },
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(HoodieSchema.parse(schemaStr)))
}
/**
@@ -201,106 +130,4 @@ object AvroConversionUtils {
val nameParts = qualifiedName.split('.')
(nameParts.last, nameParts.init.mkString("."))
}
-
- /**
- * Recursively aligns the nullable property of hoodie table schema,
supporting nested structures
- */
- def alignFieldsNullability(sourceSchema: StructType, avroSchema: Schema):
StructType = {
- // Converts Avro fields to a Map for efficient lookup
- val avroFieldsMap = avroSchema.getFields.asScala.map(f => (f.name,
f)).toMap
-
- // Recursively process fields
- val alignedFields = sourceSchema.fields.map { field =>
- avroFieldsMap.get(field.name) match {
- case Some(avroField) =>
- // Process the nullable property of the current field
- val alignedField = field.copy(nullable = avroField.schema.isNullable)
-
- // Recursively handle nested structures
- field.dataType match {
- case structType: StructType =>
- // For struct type, recursively process its internal fields
- val nestedAvroSchema = unwrapNullableSchema(avroField.schema)
- if (nestedAvroSchema.getType == Schema.Type.RECORD) {
- alignedField.copy(dataType =
alignFieldsNullability(structType, nestedAvroSchema))
- } else {
- alignedField
- }
-
- case ArrayType(elementType, containsNull) =>
- // For array type, process element type
- val arraySchema = unwrapNullableSchema(avroField.schema)
- if (arraySchema.getType == Schema.Type.ARRAY) {
- val elemSchema = arraySchema.getElementType
- val newElementType = updateElementType(elementType, elemSchema)
- alignedField.copy(dataType = ArrayType(newElementType,
elemSchema.isNullable))
- } else {
- alignedField
- }
-
- case MapType(keyType, valueType, valueContainsNull) =>
- // For Map type, process value type
- val mapSchema = unwrapNullableSchema(avroField.schema)
- if (mapSchema.getType == Schema.Type.MAP) {
- val valueSchema = mapSchema.getValueType
- val newValueType = updateElementType(valueType, valueSchema)
- alignedField.copy(dataType = MapType(keyType, newValueType,
valueSchema.isNullable))
- } else {
- alignedField
- }
-
- case _ => alignedField // Basic types are returned directly
- }
-
- case None => field.copy() // Field not found in Avro schema remains
unchanged
- }
- }
-
- StructType(alignedFields)
- }
-
- /**
- * Returns the non-null schema if the schema is a UNION type containing NULL
- */
- private def unwrapNullableSchema(schema: Schema): Schema = {
- if (schema.getType == Schema.Type.UNION) {
- val types = schema.getTypes.asScala
- val nonNullTypes = types.filter(_.getType != Schema.Type.NULL)
- if (nonNullTypes.size == 1) nonNullTypes.head else schema
- } else {
- schema
- }
- }
-
- /**
- * Updates the element type, handling nested structures
- */
- private def updateElementType(dataType: DataType, avroSchema: Schema):
DataType = {
- dataType match {
- case structType: StructType =>
- if (avroSchema.getType == Schema.Type.RECORD) {
- alignFieldsNullability(structType, avroSchema)
- } else {
- structType
- }
-
- case ArrayType(elemType, containsNull) =>
- if (avroSchema.getType == Schema.Type.ARRAY) {
- val elemSchema = avroSchema.getElementType
- ArrayType(updateElementType(elemType, elemSchema),
elemSchema.isNullable)
- } else {
- dataType
- }
-
- case MapType(keyType, valueType, valueContainsNull) =>
- if (avroSchema.getType == Schema.Type.MAP) {
- val valueSchema = avroSchema.getValueType
- MapType(keyType, updateElementType(valueType, valueSchema),
valueSchema.isNullable)
- } else {
- dataType
- }
-
- case _ => dataType // Basic types are returned directly
- }
- }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
index 1af72b9c7b91..86061f1cc414 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSchemaConversionUtils.scala
@@ -22,12 +22,16 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hudi.HoodieSparkUtils.sparkAdapter
import org.apache.hudi.common.schema.{HoodieSchema, HoodieSchemaType,
HoodieSchemaUtils}
import org.apache.hudi.internal.schema.HoodieSchemaException
+
+import org.apache.avro.{AvroRuntimeException, Schema}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.HoodieSparkSchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
+import java.util.concurrent.ConcurrentHashMap
+
import scala.collection.JavaConverters._
/**
@@ -37,6 +41,8 @@ import scala.collection.JavaConverters._
* handling defaults and nullability alignment.
*/
object HoodieSchemaConversionUtils {
+ private val SCHEMA_CACHE = new ConcurrentHashMap[HoodieSchema, StructType]
+
/**
* Converts HoodieSchema to Catalyst's StructType.
@@ -46,14 +52,20 @@ object HoodieSchemaConversionUtils {
* @throws HoodieSchemaException if conversion fails
*/
def convertHoodieSchemaToStructType(hoodieSchema: HoodieSchema): StructType
= {
- try {
- HoodieSparkSchemaConverters.toSqlType(hoodieSchema) match {
- case (dataType, _) => dataType.asInstanceOf[StructType]
+ val loader: java.util.function.Function[HoodieSchema, StructType] =
+ new java.util.function.Function[HoodieSchema, StructType]() {
+ override def apply(schema: HoodieSchema): StructType = {
+ try {
+ HoodieSparkSchemaConverters.toSqlType(schema) match {
+ case (dataType, _) => dataType.asInstanceOf[StructType]
+ }
+ } catch {
+ case e: Exception => throw new HoodieSchemaException(
+ s"Failed to convert HoodieSchema to StructType: $schema", e)
+ }
+ }
}
- } catch {
- case e: Exception => throw new HoodieSchemaException(
- s"Failed to convert HoodieSchema to StructType: $hoodieSchema", e)
- }
+ SCHEMA_CACHE.computeIfAbsent(hoodieSchema, loader)
}
/**
@@ -126,6 +138,7 @@ object HoodieSchemaConversionUtils {
try {
HoodieSparkSchemaConverters.toHoodieType(structType, nullable,
structName, recordNamespace)
} catch {
+ case a: AvroRuntimeException => throw new
HoodieSchemaException(a.getMessage, a)
case e: Exception => throw new HoodieSchemaException(
s"Failed to convert struct type to HoodieSchema: $structType", e)
}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/OrderingValueEngineTypeConverter.java
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/OrderingValueEngineTypeConverter.java
index aebf8740d46b..271f5a1fa4ce 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/OrderingValueEngineTypeConverter.java
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/OrderingValueEngineTypeConverter.java
@@ -18,7 +18,7 @@
package org.apache.hudi.util;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
@@ -68,7 +68,7 @@ public class OrderingValueEngineTypeConverter {
if (fieldSchemaOpt.isEmpty()) {
return Function.<Comparable>identity();
} else {
- DataType fieldType =
AvroConversionUtils.convertAvroSchemaToDataType(fieldSchemaOpt.get().toAvroSchema());
+ DataType fieldType =
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(fieldSchemaOpt.get());
return createConverter(fieldType, fieldSchemaOpt.get());
}
}).collect(Collectors.toList());
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala
deleted file mode 100644
index a853c1fdecde..000000000000
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/avro/HoodieSparkAvroSchemaConverters.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.spark.sql.avro
-
-import org.apache.avro.Schema
-import org.apache.spark.sql.avro.SchemaConverters.SchemaType
-import org.apache.spark.sql.types.DataType
-
-/**
- * This interface is simply a facade abstracting away Spark's
[[SchemaConverters]] implementation, allowing
- * the rest of the code-base to not depend on it directly
- */
-object HoodieSparkAvroSchemaConverters extends HoodieAvroSchemaConverters {
-
- override def toSqlType(avroSchema: Schema): (DataType, Boolean) =
- SchemaConverters.toSqlType(avroSchema) match {
- case SchemaType(dataType, nullable) => (dataType, nullable)
- }
-
- override def toAvroType(catalystType: DataType, nullable: Boolean,
recordName: String, nameSpace: String): Schema =
- SchemaConverters.toAvroType(catalystType, nullable, recordName, nameSpace)
-
-}
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index af45ebc53444..1afcc78df058 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -23,8 +23,7 @@ import org.apache.hudi.client.model.HoodieInternalRow
import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
-import org.apache.hudi.storage.{StorageConfiguration, StoragePath}
-import org.apache.avro.Schema
+import org.apache.hudi.storage.StorageConfiguration
import org.apache.hadoop.conf.Configuration
import org.apache.hudi.common.schema.HoodieSchema
import org.apache.parquet.schema.MessageType
@@ -166,7 +165,7 @@ trait SparkAdapter extends Serializable {
*/
def createRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
- schema: Schema,
+ schema: HoodieSchema,
parameters: java.util.Map[String, String]): BaseRelation
/**
diff --git
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala
index e111c9306f40..d4586cac82d5 100644
---
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala
+++
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/utils/SparkSqlUtils.scala
@@ -19,14 +19,15 @@
package org.apache.hudi.integ.testsuite.utils
-import org.apache.hudi.{AvroConversionUtils, HoodieSparkUtils}
+import org.apache.hudi.HoodieSchemaConversionUtils
+import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.util.Option
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import
org.apache.hudi.integ.testsuite.generator.GenericRecordFullPayloadGenerator
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider
-import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.sql.SparkSession
@@ -137,8 +138,8 @@ object SparkSqlUtils {
* @return an array of field names and types
*/
def getFieldNamesAndTypes(avroSchemaString: String): Array[(String, String)]
= {
- val schema = new Schema.Parser().parse(avroSchemaString)
- val structType = AvroConversionUtils.convertAvroSchemaToStructType(schema)
+ val schema = HoodieSchema.parse(avroSchemaString)
+ val structType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
structType.fields.map(field => (field.name, field.dataType.simpleString))
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDefaultSparkRecordMerger.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDefaultSparkRecordMerger.java
index d035f453fe4f..3fc95008cd58 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDefaultSparkRecordMerger.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestDefaultSparkRecordMerger.java
@@ -74,8 +74,7 @@ class TestDefaultSparkRecordMerger {
HoodieKey key = new HoodieKey(ANY_KEY, ANY_PARTITION);
Row oldValue = getSpecificValue(key, "001", 1L, "file1", 1, "1");
Row newValue = getSpecificValue(key, "002", 2L, "file2", 2, "2");
- HoodieSchema schema =
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(
- SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE));
+ HoodieSchema schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SPARK_SCHEMA,
ANY_NAME, ANY_NAMESPACE);
BufferedRecord<InternalRow> oldRecord =
BufferedRecords.fromEngineRecord(InternalRow.apply(oldValue.toSeq()), ANY_KEY,
schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null);
BufferedRecord<InternalRow> newRecord =
BufferedRecords.fromEngineRecord(InternalRow.apply(newValue.toSeq()), ANY_KEY,
schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null);
@@ -100,8 +99,7 @@ class TestDefaultSparkRecordMerger {
HoodieKey key = new HoodieKey(ANY_KEY, ANY_PARTITION);
Row oldValue = getSpecificValue(key, "001", 1L, "file1", 3, "1");
Row newValue = getSpecificValue(key, "002", 2L, "file2", 2, "2");
- HoodieSchema schema =
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(
- SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE));
+ HoodieSchema schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SPARK_SCHEMA,
ANY_NAME, ANY_NAMESPACE);
BufferedRecord<InternalRow> oldRecord =
BufferedRecords.fromEngineRecord(InternalRow.apply(oldValue.toSeq()), ANY_KEY,
schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null);
BufferedRecord<InternalRow> newRecord =
BufferedRecords.fromEngineRecord(InternalRow.apply(newValue.toSeq()), ANY_KEY,
schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null);
@@ -123,8 +121,7 @@ class TestDefaultSparkRecordMerger {
void testMergerWithNewRecordAsDelete() throws IOException {
HoodieKey key = new HoodieKey(ANY_KEY, ANY_PARTITION);
Row oldValue = getSpecificValue(key, "001", 1L, "file1", 1, "1");
- HoodieSchema schema =
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(
- SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE));
+ HoodieSchema schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SPARK_SCHEMA,
ANY_NAME, ANY_NAMESPACE);
BufferedRecord<InternalRow> oldRecord =
BufferedRecords.fromEngineRecord(InternalRow.apply(oldValue.toSeq()), ANY_KEY,
schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null);
BufferedRecord<InternalRow> newRecord =
BufferedRecords.createDelete(key.getRecordKey(), OrderingValues.getDefault());
@@ -143,8 +140,7 @@ class TestDefaultSparkRecordMerger {
void testMergerWithOldRecordAsDelete() throws IOException {
HoodieKey key = new HoodieKey(ANY_KEY, ANY_PARTITION);
Row newValue = getSpecificValue(key, "001", 1L, "file1", 1, "1");
- HoodieSchema schema =
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(
- SPARK_SCHEMA, ANY_NAME, ANY_NAMESPACE));
+ HoodieSchema schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(SPARK_SCHEMA,
ANY_NAME, ANY_NAMESPACE);
BufferedRecord<InternalRow> oldRecord =
BufferedRecords.createDelete(key.getRecordKey(), OrderingValues.getDefault());
BufferedRecord<InternalRow> newRecord =
BufferedRecords.fromEngineRecord(InternalRow.apply(newValue.toSeq()), ANY_KEY,
schema, recordContext, Collections.singletonList(INT_COLUMN_NAME), null);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
index a717bca8110b..b8a26c04fa10 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala
@@ -176,12 +176,12 @@ class TestAvroConversionUtils extends FunSuite with
Matchers {
.add("nullableMap", mapType, true).add("map", mapType, false)
.add("nullableArray", arrayType, true).add("array", arrayType, false)
- val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct,
"SchemaName", "SchemaNS")
+ val schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(struct,
"SchemaName", "SchemaNS")
val expectedSchemaStr = complexSchemaStr
- val expectedAvroSchema = HoodieSchema.parse(expectedSchemaStr).toAvroSchema
+ val expectedSchema = HoodieSchema.parse(expectedSchemaStr)
- assert(avroSchema.equals(expectedAvroSchema))
+ assert(schema.equals(expectedSchema))
}
test("test convertStructTypeToAvroSchema with Nested StructField comment") {
@@ -194,7 +194,7 @@ class TestAvroConversionUtils extends FunSuite with
Matchers {
.add("nullableMap", mapType, true).add("map",mapType,false)
.add("nullableArray", arrayType, true).add("array",arrayType,false)
- val avroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(struct,
"SchemaName", "SchemaNS")
+ val schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(struct,
"SchemaName", "SchemaNS")
val expectedSchemaStr = s"""
{
@@ -395,9 +395,9 @@ class TestAvroConversionUtils extends FunSuite with
Matchers {
}
"""
- val expectedAvroSchema = HoodieSchema.parse(expectedSchemaStr).toAvroSchema
+ val expectedSchema = HoodieSchema.parse(expectedSchemaStr)
- assert(avroSchema.equals(expectedAvroSchema))
+ assert(schema.equals(expectedSchema))
}
test("test converter with binary") {
@@ -456,105 +456,7 @@ class TestAvroConversionUtils extends FunSuite with
Matchers {
.add("name", DataTypes.StringType, true)
.add("name", DataTypes.StringType, true)
the[HoodieSchemaException] thrownBy {
- AvroConversionUtils.convertStructTypeToAvroSchema(struct, "SchemaName",
"SchemaNS")
+ HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(struct,
"SchemaName", "SchemaNS")
} should have message "Duplicate field name in record SchemaNS.SchemaName:
name type:UNION pos:2 and name type:UNION pos:1."
}
-
- test("test alignFieldsNullability function") {
- val sourceTableSchema: StructType =
- StructType(
- Seq(
- StructField("intType", IntegerType, nullable = false),
- StructField("longType", LongType),
- StructField("stringType", StringType, nullable = false),
- StructField("doubleType", DoubleType),
- StructField("floatType", FloatType, nullable = true),
- StructField("structType", new StructType(
- Array(StructField("structType_1", StringType),
StructField("structType_2", StringType)))),
- StructField("dateType", DateType),
- StructField("listType", new ArrayType(StringType, true)),
- StructField("decimalType", new DecimalType(7, 3), nullable = false),
- StructField("timeStampType", TimestampType),
- StructField("mapType", new MapType(StringType, IntegerType, true))
- )
- )
-
- val writeStructSchema: StructType =
- StructType(
- Seq(
- StructField("intType", IntegerType, nullable = false),
- StructField("longType", LongType),
- StructField("stringType", StringType, nullable = true),
- StructField("doubleType", DoubleType),
- StructField("floatType", FloatType, nullable = false),
- StructField("structType", new StructType(
- Array(StructField("structType_1", StringType, nullable = false),
StructField("structType_2", StringType)))),
- StructField("dateType", DateType, nullable = false),
- StructField("listType", new ArrayType(StringType, true)),
- StructField("decimalType", new DecimalType(7, 3)),
- StructField("timeStampType", TimestampType),
- StructField("mapType", new MapType(StringType, IntegerType, true)),
- StructField("notInTableSchemaTimeStampType_1", TimestampType),
- StructField("notInTableSchemaIntType", IntegerType, nullable =
false),
- StructField("notInTableSchemaMapType", new MapType(StringType,
IntegerType, true))
- )
- )
- val tableAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(sourceTableSchema, "data")
-
- val alignedSchema =
AvroConversionUtils.alignFieldsNullability(writeStructSchema, tableAvroSchema)
-
- val nameToNullableSourceSchema = sourceTableSchema.fields.map(item =>
(item.name, item.nullable)).toMap
-
- val nameToNullableWriteSchema = writeStructSchema.fields.map(item =>
(item.name, item.nullable)).toMap
-
- // Validate alignment rules:
- // 1. For fields existing in both schemas: use source table's nullability
- // 2. For fields only in write schema: retain original nullability
- for (field <- alignedSchema.fields) {
- if (nameToNullableSourceSchema.contains(field.name) &&
nameToNullableWriteSchema.contains(field.name)) {
- assertTrue(field.nullable == nameToNullableSourceSchema(field.name))
- }
- if (!nameToNullableSourceSchema.contains(field.name) &&
nameToNullableWriteSchema.contains(field.name)) {
- assertTrue(field.nullable == nameToNullableWriteSchema(field.name))
- }
- }
-
- for (field <- alignedSchema.fields) {
- if (field.name.equals("intType")) {
- // Common field: both schemas specify nullable=false → aligned
nullable=false
- assertFalse(field.nullable)
- }
- if (field.name.equals("longType")) {
- // Common field: both schemas default to nullable=true → aligned
nullable=true
- assertTrue(field.nullable)
- }
- if (field.name.equals("stringType")) {
- // Conflicting case:
- // Write schema (nullable=true) overridden by table schema
(nullable=false) → aligned nullable=false
- assertFalse(field.nullable)
- }
-
- if (field.name.equals("structType")) {
- val fields = field.dataType.asInstanceOf[StructType].fields
- assertTrue(fields.apply(0).nullable)
- assertTrue(fields.apply(1).nullable)
- }
-
- if (field.name.equals("dateType")) {
- // Conflicting case:
- // Write schema specifies nullable=false but table schema defaults to
true → aligned nullable=true
- assertTrue(field.nullable)
- }
-
- if (field.name.equals("notInTableSchemaIntType")) {
- // Write-exclusive field: retains original nullability=false
- assertFalse(field.nullable)
- }
-
- if (field.name.equals("notInTableSchemaMapType")) {
- // Write-exclusive field: retains original nullability=true
- assertTrue(field.nullable)
- }
- }
- }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
index dd3ea6b45df5..561e829dbe35 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala
@@ -507,7 +507,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
- val modifiedSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(structType, "trip",
"example.schema")
+ val modifiedSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, "trip",
"example.schema")
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
index f81209820e5c..2cf0dc20ad08 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriterWithTestFormat.scala
@@ -378,7 +378,7 @@ class TestHoodieSparkSqlWriterWithTestFormat extends
HoodieSparkWriterTestBase {
// generate the inserts
val schema = DataSourceTestUtils.getStructTypeExampleSchema
val structType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
- val modifiedSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(structType, "trip",
"example.schema")
+ val modifiedSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, "trip",
"example.schema")
val records = DataSourceTestUtils.generateRandomRows(100)
val recordsSeq = convertRowListToSeq(records)
val df = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
index 44f6b03f2748..562f484956f6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
@@ -18,8 +18,8 @@
package org.apache.hudi.common.model
-import org.apache.hudi.{HoodieSparkUtils, SparkAdapterSupport, SparkRowSerDe}
-import org.apache.hudi.AvroConversionUtils.{convertStructTypeToAvroSchema,
createInternalRowToAvroConverter}
+import org.apache.hudi.{HoodieSchemaConversionUtils, HoodieSparkUtils,
SparkAdapterSupport, SparkRowSerDe}
+import org.apache.hudi.AvroConversionUtils.createInternalRowToAvroConverter
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
import
org.apache.hudi.common.model.TestHoodieRecordSerialization.{cloneUsingKryo,
convertToAvroRecord, toUnsafeRow, OverwriteWithLatestAvroPayloadWithEquality}
import org.apache.hudi.common.schema.HoodieSchema
@@ -183,9 +183,9 @@ object TestHoodieRecordSerialization {
}
private def convertToAvroRecord(row: Row): GenericRecord = {
- val schema = convertStructTypeToAvroSchema(row.schema, "testRecord",
"testNamespace")
+ val schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(row.schema,
"testRecord", "testNamespace")
- createInternalRowToAvroConverter(row.schema, schema, nullable = false)
+ createInternalRowToAvroConverter(row.schema, schema.toAvroSchema, nullable
= false)
.apply(toUnsafeRow(row, row.schema))
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index 74c547d991b6..9a527b693c6e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -19,7 +19,7 @@
package org.apache.hudi.common.table.read
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
SparkAdapterSupport, SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
HoodieSchemaConversionUtils, SparkAdapterSupport,
SparkFileFormatInternalRowReaderContext}
import org.apache.hudi.DataSourceWriteOptions.{OPERATION, RECORDKEY_FIELD,
TABLE_TYPE}
import org.apache.hudi.common.config.{HoodieReaderConfig, RecordMergeMode,
TypedProperties}
import org.apache.hudi.common.engine.HoodieReaderContext
@@ -41,7 +41,7 @@ import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{HoodieSparkKryoRegistrar, SparkConf}
import org.apache.spark.sql.{Dataset, HoodieInternalRowUtils, Row, SaveMode,
SparkSession}
-import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters
+import org.apache.spark.sql.avro.HoodieSparkSchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.execution.datasources.SparkColumnarFileReader
@@ -108,7 +108,7 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
override def getHoodieReaderContext(tablePath: String, schema: HoodieSchema,
storageConf: StorageConfiguration[_], metaClient: HoodieTableMetaClient):
HoodieReaderContext[InternalRow] = {
val parquetReader = sparkAdapter.createParquetFileReader(vectorized =
false, spark.sessionState.conf, Map.empty,
storageConf.unwrapAs(classOf[Configuration]))
- val dataSchema =
AvroConversionUtils.convertAvroSchemaToStructType(schema.toAvroSchema)
+ val dataSchema =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema)
val orcReader = sparkAdapter.createOrcFileReader(vectorized = false,
spark.sessionState.conf, Map.empty,
storageConf.unwrapAs(classOf[Configuration]), dataSchema)
val multiFormatReader = new
MultipleColumnarFileFormatReader(parquetReader, orcReader)
new SparkFileFormatInternalRowReaderContext(multiFormatReader, Seq.empty,
Seq.empty, getStorageConf, metaClient.getTableConfig)
@@ -138,7 +138,7 @@ class TestHoodieFileGroupReaderOnSpark extends
TestHoodieFileGroupReaderBase[Int
override def assertRecordsEqual(schema: HoodieSchema, expected: InternalRow,
actual: InternalRow): Unit = {
assertEquals(expected.numFields, actual.numFields)
- val expectedStruct =
HoodieSparkAvroSchemaConverters.toSqlType(schema.toAvroSchema)._1.asInstanceOf[StructType]
+ val expectedStruct =
HoodieSparkSchemaConverters.toSqlType(schema)._1.asInstanceOf[StructType]
expected.toSeq(expectedStruct).zip(actual.toSeq(expectedStruct)).zipWithIndex.foreach
{
case ((v1, v2), i) =>
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
index 2f18a202639c..50686448b408 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/ColumnStatIndexTestBase.scala
@@ -374,7 +374,7 @@ class ColumnStatIndexTestBase extends
HoodieSparkClientTestBase {
assertEquals(asJson(sort(pExpectedColStatsIndexTableDf.drop(colsToDrop:
_*), pValidationSortColumns)),
asJson(sort(pTransposedColStatsDF.drop(colsToDrop: _*),
pValidationSortColumns)))
- val convertedSchema =
AvroConversionUtils.convertAvroSchemaToStructType(AvroConversionUtils.convertStructTypeToAvroSchema(pExpectedColStatsSchema,
"col_stats_schema"))
+ val convertedSchema =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(pExpectedColStatsSchema,
"col_stats_schema"))
if (testCase.tableType == HoodieTableType.COPY_ON_WRITE) {
val manualColStatsTableDF =
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
index 1ce99067f9c4..81fd89ec6d41 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala
@@ -19,9 +19,8 @@
package org.apache.hudi.functional
-import org.apache.hudi.AvroConversionUtils
+import org.apache.hudi.{AvroConversionUtils, HoodieSchemaConversionUtils,
PartitionStatsIndexSupport}
import org.apache.hudi.DataSourceWriteOptions._
-import org.apache.hudi.PartitionStatsIndexSupport
import org.apache.hudi.TestHoodieSparkUtils.dropMetaFields
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.schema.HoodieSchema
@@ -114,7 +113,7 @@ class PartitionStatsIndexTestBase extends
HoodieStatsIndexTestBase {
val partitionStatsIndex = new PartitionStatsIndexSupport(
spark,
inputDf.schema,
-
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(inputDf.schema,
"record", "")),
+
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(inputDf.schema,
"record", ""),
HoodieMetadataConfig.newBuilder().enable(true).build(),
metaClient)
val partitionStats =
partitionStatsIndex.loadColumnStatsIndexRecords(List("partition", "trip_type"),
shouldReadInMemory = true).collectAsList()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
index 26fa1c722344..34963c7b4e22 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala
@@ -17,9 +17,9 @@
package org.apache.hudi.functional
-import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
ScalaAssertionSupport, SparkAdapterSupport}
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSchemaConversionUtils,
ScalaAssertionSupport, SparkAdapterSupport}
import org.apache.hudi.HoodieConversionUtils.toJavaOption
-import org.apache.hudi.common.config.{HoodieMetadataConfig, RecordMergeMode}
+import org.apache.hudi.common.config.RecordMergeMode
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver}
import org.apache.hudi.common.util.Option
@@ -125,7 +125,7 @@ class TestBasicSchemaEvolution extends
HoodieSparkClientTestBase with ScalaAsser
tableMetaClient.reloadActiveTimeline()
val resolver = new TableSchemaResolver(tableMetaClient)
- val latestTableSchema =
AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema(false))
+ val latestTableSchema =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(resolver.getTableSchema(false))
val df =
spark.read.format("org.apache.hudi")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index a49591f429b9..6473bb3de343 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -17,7 +17,7 @@
package org.apache.hudi.functional
-import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils,
QuickstartUtils, ScalaAssertionSupport}
+import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions,
DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSchemaConversionUtils,
HoodieSparkUtils, QuickstartUtils, ScalaAssertionSupport}
import org.apache.hudi.DataSourceWriteOptions.{INLINE_CLUSTERING_ENABLE,
KEYGENERATOR_CLASS_NAME}
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.QuickstartUtils.{convertToStringList,
getQuickstartWriteConfigs}
@@ -696,12 +696,12 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
val tableMetaClient = createMetaClient(spark, basePath)
assertFalse(tableMetaClient.getArchivedTimeline.empty())
- val actualSchema = new
TableSchemaResolver(tableMetaClient).getTableAvroSchema(false)
+ val actualSchema = new
TableSchemaResolver(tableMetaClient).getTableSchema(false)
val (structName, nameSpace) =
AvroConversionUtils.getAvroRecordNameAndNamespace(CommonOptionUtils.commonOpts(HoodieWriteConfig.TBL_NAME.key))
spark.sparkContext.getConf.registerKryoClasses(
Array(classOf[org.apache.avro.generic.GenericData],
classOf[org.apache.avro.Schema]))
- val schema = AvroConversionUtils.convertStructTypeToAvroSchema(structType,
structName, nameSpace)
+ val schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType,
structName, nameSpace)
assertTrue(actualSchema != null)
assertEquals(schema, actualSchema)
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
index e5ff78e4488d..12712b4db617 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndexWithSQL.scala
@@ -18,7 +18,7 @@
package org.apache.hudi.functional
-import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport,
DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex}
+import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport,
DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex,
HoodieSchemaConversionUtils}
import org.apache.hudi.DataSourceWriteOptions.{DELETE_OPERATION_OPT_VAL,
RECORDKEY_FIELD}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
@@ -485,7 +485,7 @@ class TestColumnStatsIndexWithSQL extends
ColumnStatIndexTestBase {
var fileIndex = HoodieFileIndex(spark, metaClient, None, commonOpts +
("path" -> basePath), includeLogFiles = true)
val metadataConfig =
HoodieMetadataConfig.newBuilder.withMetadataIndexColumnStats(true).enable(true).build
- val hoodieSchema =
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(fileIndex.schema,
"record", ""))
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(fileIndex.schema,
"record", "")
val cis = new ColumnStatsIndexSupport(spark, fileIndex.schema,
hoodieSchema, metadataConfig, metaClient)
// unpartitioned table - get all file slices
val fileSlices = fileIndex.prunePartitionsAndGetFileSlices(Seq.empty,
Seq())
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index ec8ba1d8e54d..85f20ad3050f 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -19,7 +19,7 @@
package org.apache.hudi.functional
-import org.apache.hudi.{AvroConversionUtils, ColumnStatsIndexSupport,
DataSourceReadOptions, DataSourceWriteOptions, HoodieFileIndex,
PartitionStatsIndexSupport}
+import org.apache.hudi.{ColumnStatsIndexSupport, DataSourceReadOptions,
DataSourceWriteOptions, HoodieFileIndex, HoodieSchemaConversionUtils,
PartitionStatsIndexSupport}
import org.apache.hudi.DataSourceWriteOptions.{BULK_INSERT_OPERATION_OPT_VAL,
MOR_TABLE_TYPE_OPT_VAL, PARTITIONPATH_FIELD, UPSERT_OPERATION_OPT_VAL}
import org.apache.hudi.avro.model.HoodieCleanMetadata
import org.apache.hudi.client.SparkRDDWriteClient
@@ -28,7 +28,6 @@ import
org.apache.hudi.client.transaction.SimpleConcurrentFileWritesConflictReso
import org.apache.hudi.client.transaction.lock.InProcessLockProvider
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile,
HoodieFailedWritesCleaningPolicy, HoodieTableType, WriteConcurrencyMode,
WriteOperationType}
-import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieInstant
import
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAvroMetadataLegacy
@@ -465,7 +464,7 @@ class TestPartitionStatsIndex extends
PartitionStatsIndexTestBase {
val partitionStatsIndex = new PartitionStatsIndexSupport(
spark,
latestDf.schema,
-
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(latestDf.schema,
"record", "")),
+
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(latestDf.schema,
"record", ""),
HoodieMetadataConfig.newBuilder()
.enable(true)
.build(),
@@ -476,7 +475,7 @@ class TestPartitionStatsIndex extends
PartitionStatsIndexTestBase {
.collectAsList()
assertTrue(partitionStats.size() > 0)
// Assert column stats after restore.
- val hoodieSchema =
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(latestDf.schema,
"record", ""))
+ val hoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(latestDf.schema,
"record", "")
val columnStatsIndex = new ColumnStatsIndexSupport(
spark, latestDf.schema,
hoodieSchema,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestSparkInternalSchemaConverter.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestSparkInternalSchemaConverter.scala
index 9dbe116dc27b..5c8ea92be008 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestSparkInternalSchemaConverter.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/util/TestSparkInternalSchemaConverter.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.internal.schema.convert.TestInternalSchemaConverter._
import org.apache.hudi.testutils.HoodieSparkClientTestHarness
-import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters
+import org.apache.spark.sql.avro.HoodieSparkSchemaConverters
import org.apache.spark.sql.types._
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test
@@ -33,7 +33,7 @@ import org.junit.jupiter.api.Test
class TestSparkInternalSchemaConverter extends HoodieSparkClientTestHarness
with SparkAdapterSupport {
private def getStructType(schema: HoodieSchema): DataType = {
- HoodieSparkAvroSchemaConverters.toSqlType(schema.toAvroSchema)._1
+ HoodieSparkSchemaConverters.toSqlType(schema)._1
}
@Test
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
index dfd97349a8ba..4a488ca1a0eb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
@@ -80,8 +80,8 @@ object AvroSerDerBenchmark extends HoodieBenchmarkBase {
spark.sparkContext.getConf.registerAvroSchemas(schema.toAvroSchema)
benchmark.addCase("deserialize avro Record to internalRow") { _ =>
testRdd.mapPartitions { iter =>
- val schema =
AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, "record", "my")
- val avroToRowConverter =
AvroConversionUtils.createAvroToInternalRowConverter(schema, sparkSchema)
+ val schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(sparkSchema,
"record", "my")
+ val avroToRowConverter =
AvroConversionUtils.createAvroToInternalRowConverter(schema.toAvroSchema,
sparkSchema)
iter.map(record => avroToRowConverter.apply(record).get)
}.foreach(f => f)
}
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
index dbce843fa936..4f2bbb4b5b75 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark3Adapter.scala
@@ -17,22 +17,20 @@
package org.apache.spark.sql.adapter
-import org.apache.hudi.{AvroConversionUtils, DefaultSource,
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping,
Spark3HoodiePartitionCDCFileGroupMapping, Spark3HoodiePartitionFileSliceMapping}
+import org.apache.hudi.{DefaultSource, HoodiePartitionCDCFileGroupMapping,
HoodiePartitionFileSliceMapping, HoodieSchemaConversionUtils,
Spark3HoodiePartitionCDCFileGroupMapping, Spark3HoodiePartitionFileSliceMapping}
import org.apache.hudi.client.model.{HoodieInternalRow,
Spark3HoodieInternalRow}
import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.spark.internal.ReflectUtil
-import org.apache.hudi.storage.StoragePath
-import org.apache.avro.Schema
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Column, DataFrame,
DataFrameUtil, Dataset, HoodieUnsafeUtils, HoodieUTF8StringFactory,
Spark3DataFrameUtil, Spark3HoodieUnsafeUtils, Spark3HoodieUTF8StringFactory,
SparkSession, SQLContext}
import
org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan
-import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters,
HoodieSparkAvroSchemaConverters, HoodieSparkSchemaConverters}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -101,9 +99,9 @@ abstract class BaseSpark3Adapter extends SparkAdapter with
Logging {
override def createRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
- schema: Schema,
+ schema: HoodieSchema,
parameters: java.util.Map[String, String]):
BaseRelation = {
- val dataSchema =
Option(schema).map(AvroConversionUtils.convertAvroSchemaToStructType).orNull
+ val dataSchema =
Option(schema).map(HoodieSchemaConversionUtils.convertHoodieSchemaToStructType).orNull
DefaultSource.createRelation(sqlContext, metaClient, dataSchema,
parameters.asScala.toMap)
}
diff --git
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
index 1d0391aa219c..58ed3eb5b88c 100644
---
a/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
+++
b/hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala
@@ -17,24 +17,22 @@
package org.apache.spark.sql.adapter
-import org.apache.hudi.{AvroConversionUtils, DefaultSource,
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping,
Spark4HoodiePartitionCDCFileGroupMapping, Spark4HoodiePartitionFileSliceMapping}
+import org.apache.hudi.{DefaultSource, HoodiePartitionCDCFileGroupMapping,
HoodiePartitionFileSliceMapping, HoodieSchemaConversionUtils,
Spark4HoodiePartitionCDCFileGroupMapping, Spark4HoodiePartitionFileSliceMapping}
import org.apache.hudi.client.model.{HoodieInternalRow,
Spark4HoodieInternalRow}
import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.spark.internal.ReflectUtil
import org.apache.hudi.storage.StorageConfiguration
-import org.apache.hudi.storage.StoragePath
-import org.apache.avro.Schema
import org.apache.parquet.schema.MessageType
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Column, DataFrame,
DataFrameUtil, ExpressionColumnNodeWrapper, HoodieUnsafeUtils,
HoodieUTF8StringFactory, Spark4DataFrameUtil, Spark4HoodieUnsafeUtils,
Spark4HoodieUTF8StringFactory, SparkSession, SQLContext}
import
org.apache.spark.sql.FileFormatUtilsForFileGroupReader.applyFiltersToPlan
-import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters,
HoodieSparkAvroSchemaConverters, HoodieSparkSchemaConverters}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog.CatalogTable
@@ -106,9 +104,9 @@ abstract class BaseSpark4Adapter extends SparkAdapter with
Logging {
override def createRelation(sqlContext: SQLContext,
metaClient: HoodieTableMetaClient,
- schema: Schema,
+ schema: HoodieSchema,
parameters: java.util.Map[String, String]):
BaseRelation = {
- val dataSchema =
Option(schema).map(AvroConversionUtils.convertAvroSchemaToStructType).orNull
+ val dataSchema =
Option(schema).map(HoodieSchemaConversionUtils.convertHoodieSchemaToStructType).orNull
DefaultSource.createRelation(sqlContext, metaClient, dataSchema,
parameters.asScala.toMap)
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index b17116374bff..02fc43dd37fc 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -18,8 +18,8 @@
package org.apache.hudi.utilities;
-import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceReadOptions;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.PartitionStatsIndexSupport;
import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -67,7 +67,6 @@ import
org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.FileFormatUtils;
-import org.apache.hudi.io.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieDataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
@@ -83,6 +82,7 @@ import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.util.FileIOUtils;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieIndexVersion;
import org.apache.hudi.metadata.HoodieMetadataPayload;
@@ -1032,7 +1032,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
PartitionStatsIndexSupport partitionStatsIndexSupport = new
PartitionStatsIndexSupport(engineContext.getSqlContext().sparkSession(),
-
AvroConversionUtils.convertAvroSchemaToStructType(metadataTableBasedContext.getSchema().toAvroSchema()),
+
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(metadataTableBasedContext.getSchema()),
metadataTableBasedContext.getSchema(),
metadataTableBasedContext.getMetadataConfig(),
metaClientOpt.get(), false);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
index 6c7b5c1b810a..6c58fb2739dc 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/HiveSchemaProvider.java
@@ -19,8 +19,9 @@
package org.apache.hudi.utilities.schema;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.utilities.config.HiveSchemaProviderConfig;
import org.apache.hudi.utilities.exception.HoodieSchemaFetchException;
@@ -43,8 +44,8 @@ import static
org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
*/
public class HiveSchemaProvider extends SchemaProvider {
- private final Schema sourceSchema;
- private Schema targetSchema;
+ private final HoodieSchema sourceSchema;
+ private HoodieSchema targetSchema;
public HiveSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
super(props, jssc);
@@ -57,7 +58,7 @@ public class HiveSchemaProvider extends SchemaProvider {
try {
TableIdentifier sourceSchemaTable = new
TableIdentifier(sourceSchemaTableName,
scala.Option.apply(sourceSchemaDatabaseName));
StructType sourceSchema =
spark.sessionState().catalog().getTableMetadata(sourceSchemaTable).schema();
- this.sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
+ this.sourceSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
sourceSchema,
sourceSchemaTableName,
"hoodie." + sourceSchemaDatabaseName);
@@ -72,7 +73,7 @@ public class HiveSchemaProvider extends SchemaProvider {
try {
TableIdentifier targetSchemaTable = new
TableIdentifier(targetSchemaTableName,
scala.Option.apply(targetSchemaDatabaseName));
StructType targetSchema =
spark.sessionState().catalog().getTableMetadata(targetSchemaTable).schema();
- this.targetSchema = AvroConversionUtils.convertStructTypeToAvroSchema(
+ this.targetSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(
targetSchema,
targetSchemaTableName,
"hoodie." + targetSchemaDatabaseName);
@@ -84,13 +85,13 @@ public class HiveSchemaProvider extends SchemaProvider {
@Override
public Schema getSourceSchema() {
- return sourceSchema;
+ return sourceSchema.toAvroSchema();
}
@Override
public Schema getTargetSchema() {
if (targetSchema != null) {
- return targetSchema;
+ return targetSchema.toAvroSchema();
} else {
return super.getTargetSchema();
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
index 29504c01c006..b783ba32c4d9 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/RowBasedSchemaProvider.java
@@ -18,7 +18,7 @@
package org.apache.hudi.utilities.schema;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.avro.Schema;
@@ -44,6 +44,6 @@ public class RowBasedSchemaProvider extends SchemaProvider {
@Override
public Schema getSourceSchema() {
- return AvroConversionUtils.convertStructTypeToAvroSchema(rowStruct,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE);
+ return
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(rowStruct,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE).toAvroSchema();
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index 2704dc132a2e..d25f3082d082 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -18,13 +18,13 @@
package org.apache.hudi.utilities.sources.helpers;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
-import org.apache.avro.Schema;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
@@ -34,7 +34,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
@@ -194,8 +194,8 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
List<Row> expected = Arrays.asList(person1, person2, person3);
List<Row> actual = result.get().collectAsList();
Assertions.assertEquals(new HashSet<>(expected), new HashSet<>(actual));
- Schema schema = new Schema.Parser().parse(new File(schemaFilePath));
- StructType expectedSchema =
AvroConversionUtils.convertAvroSchemaToStructType(schema);
+ HoodieSchema schema = HoodieSchema.parse(new
FileInputStream(schemaFilePath));
+ StructType expectedSchema =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema);
// assert final output schema matches with the source schema
Assertions.assertEquals(expectedSchema, result.get().schema(), "output
dataset schema should match source schema");
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
index 36e53bfafc46..eb195de5a8c9 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestMercifulJsonToRowConverterBase.java
@@ -18,7 +18,7 @@
package org.apache.hudi.utilities.sources.helpers;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.MercifulJsonConverterTestBase;
import org.apache.hudi.common.schema.HoodieSchema;
@@ -686,7 +686,7 @@ public abstract class TestMercifulJsonToRowConverterBase
extends MercifulJsonCon
}
private void validateSchemaCompatibility(List<Row> rows, HoodieSchema
schema) {
- StructType rowSchema =
AvroConversionUtils.convertAvroSchemaToStructType(schema.toAvroSchema());
+ StructType rowSchema =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schema);
Dataset<Row> dataset = spark.createDataFrame(rows, rowSchema);
assertDoesNotThrow(dataset::collect, "Schema validation and dataset
creation should not throw any exceptions.");
}