This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 399af74b369f feat(schema): Migrate StreamSync code path and its
dependencies to use HoodieSchema (#17600)
399af74b369f is described below
commit 399af74b369f17235115a1ad3632f15718b3e830
Author: Tim Brown <[email protected]>
AuthorDate: Mon Dec 22 16:25:15 2025 -0500
feat(schema): Migrate StreamSync code path and its dependencies to use
HoodieSchema (#17600)
---
.../scala/org/apache/hudi/HoodieSparkUtils.scala | 58 +++++------
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 69 -------------
.../common/schema/HoodieSchemaCompatibility.java | 6 ++
.../hudi/common/schema/HoodieSchemaField.java | 5 +
.../hudi/common/schema/HoodieSchemaUtils.java | 43 ++++++++
.../hudi/common/table/TableSchemaResolver.java | 15 +++
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 55 ----------
.../hudi/common/schema/TestHoodieSchemaUtils.java | 49 +++++++++
.../common/testutils/HoodieTestDataGenerator.java | 2 +
.../testsuite/dag/nodes/SparkDeleteNode.scala | 4 +-
.../testsuite/dag/nodes/SparkInsertNode.scala | 3 +-
.../org/apache/hudi/HoodieCreateRecordUtils.scala | 3 +-
.../org/apache/hudi/TestHoodieSparkUtils.scala | 25 ++---
.../execution/benchmark/AvroSerDerBenchmark.scala | 14 +--
.../benchmark/CreateHandleBenchmark.scala | 15 ++-
.../org/apache/hudi/utilities/UtilHelpers.java | 18 ++--
.../apache/hudi/utilities/schema/SchemaSet.java | 11 +-
.../helpers/CloudObjectsSelectorCommon.java | 114 ++++++++++-----------
.../utilities/streamer/HoodieStreamerUtils.java | 19 ++--
.../utilities/streamer/SourceFormatAdapter.java | 16 +--
.../apache/hudi/utilities/streamer/StreamSync.java | 53 ++++------
.../utilities/transform/ChainedTransformer.java | 12 +--
.../ErrorTableAwareChainedTransformer.java | 4 +-
...TestHoodieDeltaStreamerSchemaEvolutionBase.java | 10 +-
...estHoodieDeltaStreamerSchemaEvolutionQuick.java | 35 ++++---
.../deltastreamer/TestSourceFormatAdapter.java | 10 +-
.../functional/TestChainedTransformer.java | 14 +--
.../utilities/sources/TestGenericRddTransform.java | 6 +-
.../utilities/testutils/SanitizationTestUtils.java | 38 +++----
29 files changed, 346 insertions(+), 380 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 97dfb1b3d5c1..153aca96a154 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -19,9 +19,11 @@
package org.apache.hudi
import org.apache.hudi.HoodieConversionUtils.toScalaOption
-import org.apache.hudi.avro.{AvroSchemaUtils, HoodieAvroUtils}
+import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig
import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.schema.HoodieSchema
+import org.apache.hudi.common.util.{Option => HOption}
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator
import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.hudi.storage.StoragePath
@@ -38,8 +40,8 @@ import
org.apache.spark.sql.catalyst.util.DateTimeUtils.getTimeZone
import org.apache.spark.sql.execution.SQLConfInjectingRDD
import org.apache.spark.sql.execution.datasources.SparkParsePartitionUtil
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types.{DateType, StringType, StructField,
StructType, TimestampType}
-import org.apache.spark.sql.{DataFrame, HoodieUnsafeUtils}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.DataFrame
import org.apache.spark.unsafe.types.UTF8String
import scala.collection.JavaConverters._
@@ -76,27 +78,22 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
*/
@Deprecated
def createRdd(df: DataFrame, structName: String, recordNamespace: String,
reconcileToLatestSchema: Boolean,
- latestTableSchema: org.apache.hudi.common.util.Option[Schema]
= org.apache.hudi.common.util.Option.empty()): RDD[GenericRecord] = {
+ latestTableSchema: HOption[HoodieSchema] = HOption.empty()):
RDD[GenericRecord] = {
createRdd(df, structName, recordNamespace,
toScalaOption(latestTableSchema))
}
def createRdd(df: DataFrame, structName: String, recordNamespace: String):
RDD[GenericRecord] =
createRdd(df, structName, recordNamespace, None)
- def createRdd(df: DataFrame, structName: String, recordNamespace: String,
readerAvroSchemaOpt: Option[Schema]): RDD[GenericRecord] = {
+ def createRdd(df: DataFrame, structName: String, recordNamespace: String,
readerSchemaOpt: Option[HoodieSchema]): RDD[GenericRecord] = {
val writerSchema = df.schema
- val writerAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName,
recordNamespace)
- val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+ val writerHoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(writerSchema,
structName, recordNamespace)
+ val readerHoodieSchema = readerSchemaOpt.getOrElse(writerHoodieSchema)
// We check whether passed in reader schema is identical to writer schema
to avoid costly serde loop of
// making Spark deserialize its internal representation [[InternalRow]]
into [[Row]] for subsequent conversion
// (and back)
- val sameSchema = writerAvroSchema.equals(readerAvroSchema)
- val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(writerAvroSchema)
!= writerAvroSchema
-
- // NOTE: We have to serialize Avro schema, and then subsequently parse it
on the executor node, since Spark
- // serializer is not able to digest it
- val readerAvroSchemaStr = readerAvroSchema.toString
- val writerAvroSchemaStr = writerAvroSchema.toString
+ val sameSchema = writerHoodieSchema.equals(readerHoodieSchema)
+ val nullable = writerHoodieSchema.isNullable
// NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to
[[Row]] conversion
// Additionally, we have to explicitly wrap around resulting [[RDD]]
into the one
@@ -106,17 +103,15 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
if (rows.isEmpty) {
Iterator.empty
} else {
- val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr)
val transform: GenericRecord => GenericRecord =
if (sameSchema) identity
else {
- HoodieAvroUtils.rewriteRecordDeep(_, readerAvroSchema)
+ HoodieAvroUtils.rewriteRecordDeep(_,
readerHoodieSchema.toAvroSchema)
}
// Since caller might request to get records in a different
("evolved") schema, we will be rewriting from
// existing Writer's schema into Reader's (avro) schema
- val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
- val convert =
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema,
writerAvroSchema, nullable = nullable)
+ val convert =
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema,
writerHoodieSchema.toAvroSchema, nullable = nullable)
rows.map { ir => transform(convert(ir)) }
}
@@ -137,9 +132,9 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
}
def safeCreateRDD(df: DataFrame, structName: String, recordNamespace:
String, reconcileToLatestSchema: Boolean,
- latestTableSchema:
org.apache.hudi.common.util.Option[Schema] =
org.apache.hudi.common.util.Option.empty()):
+ latestTableSchema: HOption[HoodieSchema] =
HOption.empty()):
Tuple2[RDD[GenericRecord], RDD[String]] = {
- var latestTableSchemaConverted: Option[Schema] = None
+ var latestTableSchemaConverted: Option[HoodieSchema] = None
if (latestTableSchema.isPresent && reconcileToLatestSchema) {
latestTableSchemaConverted = Some(latestTableSchema.get())
@@ -151,33 +146,26 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
safeCreateRDD(df, structName, recordNamespace, latestTableSchemaConverted);
}
- def safeCreateRDD(df: DataFrame, structName: String, recordNamespace:
String, readerAvroSchemaOpt: Option[Schema]):
+ def safeCreateRDD(df: DataFrame, structName: String, recordNamespace:
String, readerSchemaOpt: Option[HoodieSchema]):
Tuple2[RDD[GenericRecord], RDD[String]] = {
val writerSchema = df.schema
- val writerAvroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(writerSchema, structName,
recordNamespace)
- val readerAvroSchema = readerAvroSchemaOpt.getOrElse(writerAvroSchema)
+ val writerHoodieSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(writerSchema,
structName, recordNamespace)
+ val readerHoodieSchema = readerSchemaOpt.getOrElse(writerHoodieSchema)
// We check whether passed in reader schema is identical to writer schema
to avoid costly serde loop of
// making Spark deserialize its internal representation [[InternalRow]]
into [[Row]] for subsequent conversion
// (and back)
- val sameSchema = writerAvroSchema.equals(readerAvroSchema)
- val nullable = AvroSchemaUtils.getNonNullTypeFromUnion(writerAvroSchema)
!= writerAvroSchema
+ val sameSchema = writerHoodieSchema.equals(readerHoodieSchema)
+ val nullable = writerHoodieSchema.isNullable
- // NOTE: We have to serialize Avro schema, and then subsequently parse it
on the executor node, since Spark
- // serializer is not able to digest it
- val writerAvroSchemaStr = writerAvroSchema.toString
- val readerAvroSchemaStr = readerAvroSchema.toString
// NOTE: We're accessing toRdd here directly to avoid [[InternalRow]] to
[[Row]] conversion
-
if (!sameSchema) {
val rdds: RDD[Either[GenericRecord, InternalRow]] =
df.queryExecution.toRdd.mapPartitions { rows =>
if (rows.isEmpty) {
Iterator.empty
} else {
- val writerAvroSchema = new Schema.Parser().parse(writerAvroSchemaStr)
- val readerAvroSchema = new Schema.Parser().parse(readerAvroSchemaStr)
- val convert =
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema,
writerAvroSchema, nullable = nullable)
+ val convert =
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema,
writerHoodieSchema.toAvroSchema, nullable = nullable)
val transform: InternalRow => Either[GenericRecord, InternalRow] =
internalRow => try {
- Left(HoodieAvroUtils.rewriteRecordDeep(convert(internalRow),
readerAvroSchema, true))
+ Left(HoodieAvroUtils.rewriteRecordDeep(convert(internalRow),
readerHoodieSchema.toAvroSchema, true))
} catch {
case _: Throwable =>
Right(internalRow)
@@ -197,7 +185,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with
SparkVersionsSupport wi
if (rows.isEmpty) {
Iterator.empty
} else {
- val convert =
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema,
writerAvroSchema, nullable = nullable)
+ val convert =
AvroConversionUtils.createInternalRowToAvroConverter(writerSchema,
writerHoodieSchema.toAvroSchema, nullable = nullable)
rows.map(convert)
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index c4ea3709c115..46b98f9ca795 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -90,7 +90,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import java.util.TreeMap;
-import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -1370,74 +1369,6 @@ public class HoodieAvroUtils {
scale, new MathContext(precision, RoundingMode.HALF_UP));
}
- public static boolean hasDecimalField(Schema schema) {
- return hasDecimalWithCondition(schema, unused -> true);
- }
-
- /**
- * Checks whether the provided schema contains a decimal with a precision
less than or equal to 18,
- * which allows the decimal to be stored as int/long instead of a fixed size
byte array in
- * <a
href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md">parquet
logical types</a>
- * @param schema the input schema to search
- * @return true if the schema contains a small precision decimal field and
false otherwise
- */
- public static boolean hasSmallPrecisionDecimalField(Schema schema) {
- return hasDecimalWithCondition(schema,
HoodieAvroUtils::isSmallPrecisionDecimalField);
- }
-
- private static boolean hasDecimalWithCondition(Schema schema,
Function<Decimal, Boolean> condition) {
- switch (schema.getType()) {
- case RECORD:
- for (Field field : schema.getFields()) {
- if (hasDecimalWithCondition(field.schema(), condition)) {
- return true;
- }
- }
- return false;
- case ARRAY:
- return hasDecimalWithCondition(schema.getElementType(), condition);
- case MAP:
- return hasDecimalWithCondition(schema.getValueType(), condition);
- case UNION:
- return hasDecimalWithCondition(getActualSchemaFromUnion(schema, null),
condition);
- default:
- if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
- Decimal decimal = (Decimal) schema.getLogicalType();
- return condition.apply(decimal);
- } else {
- return false;
- }
- }
- }
-
- private static boolean isSmallPrecisionDecimalField(Decimal decimal) {
- return decimal.getPrecision() <= 18;
- }
-
- /**
- * Checks whether the provided schema contains a list or map field.
- * @param schema input
- * @return true if a list or map is present, false otherwise
- */
- public static boolean hasListOrMapField(Schema schema) {
- switch (schema.getType()) {
- case RECORD:
- for (Field field : schema.getFields()) {
- if (hasListOrMapField(field.schema())) {
- return true;
- }
- }
- return false;
- case ARRAY:
- case MAP:
- return true;
- case UNION:
- return hasListOrMapField(getActualSchemaFromUnion(schema, null));
- default:
- return false;
- }
- }
-
/**
* Avro does not support type promotion from numbers to string. This
function returns true if
* it will be necessary to rewrite the record to support this promotion.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java
index ab2f04b2ae9e..59e830cd555f 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaCompatibility.java
@@ -24,6 +24,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.internal.schema.HoodieSchemaException;
+import org.apache.avro.SchemaCompatibility;
+
import java.util.Collections;
import java.util.Set;
@@ -46,6 +48,10 @@ public final class HoodieSchemaCompatibility {
private HoodieSchemaCompatibility() {
}
+ public static boolean areSchemasCompatible(HoodieSchema tableSchema,
HoodieSchema writerSchema) {
+ return
SchemaCompatibility.checkReaderWriterCompatibility(tableSchema.toAvroSchema(),
writerSchema.toAvroSchema()).getType() ==
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE;
+ }
+
/**
* Checks if writer schema is compatible with table schema for write
operations.
* This is equivalent to AvroSchemaUtils.checkSchemaCompatible() but
operates on HoodieSchemas.
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaField.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaField.java
index aa5782ff1d57..b2671cbc57a0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaField.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaField.java
@@ -27,6 +27,7 @@ import org.apache.avro.Schema;
import java.io.Serializable;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
/**
* Wrapper class for Avro Schema.Field that provides Hudi-specific field
functionality
@@ -237,6 +238,10 @@ public class HoodieSchemaField implements Serializable {
avroField.addProp(key, value);
}
+ public Set<String> aliases() {
+ return avroField.aliases();
+ }
+
/**
* Returns the underlying Avro field for compatibility purposes.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
index 0c7034434260..cd0be22fd52e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java
@@ -36,6 +36,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -681,4 +682,46 @@ public final class HoodieSchemaUtils {
// Delegate to AvroSchemaUtils
return AvroSchemaUtils.getAvroRecordQualifiedName(tableName);
}
+
+ public static boolean hasDecimalField(HoodieSchema schema) {
+ return hasDecimalWithCondition(schema, unused -> true);
+ }
+
+ /**
+ * Checks whether the provided schema contains a decimal with a precision
less than or equal to 18,
+ * which allows the decimal to be stored as int/long instead of a fixed size
byte array in
+ * <a
href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md">parquet
logical types</a>
+ * @param schema the input schema to search
+ * @return true if the schema contains a small precision decimal field and
false otherwise
+ */
+ public static boolean hasSmallPrecisionDecimalField(HoodieSchema schema) {
+ return hasDecimalWithCondition(schema,
HoodieSchemaUtils::isSmallPrecisionDecimalField);
+ }
+
+ private static boolean hasDecimalWithCondition(HoodieSchema schema,
Function<HoodieSchema.Decimal, Boolean> condition) {
+ switch (schema.getType()) {
+ case RECORD:
+ for (HoodieSchemaField field : schema.getFields()) {
+ if (hasDecimalWithCondition(field.schema(), condition)) {
+ return true;
+ }
+ }
+ return false;
+ case ARRAY:
+ return hasDecimalWithCondition(schema.getElementType(), condition);
+ case MAP:
+ return hasDecimalWithCondition(schema.getValueType(), condition);
+ case UNION:
+ return hasDecimalWithCondition(schema.getNonNullType(), condition);
+ case DECIMAL:
+ HoodieSchema.Decimal decimal = (HoodieSchema.Decimal) schema;
+ return condition.apply(decimal);
+ default:
+ return false;
+ }
+ }
+
+ private static boolean isSmallPrecisionDecimalField(HoodieSchema.Decimal
decimal) {
+ return decimal.getPrecision() <= 18;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 893dac2ba464..e879c8f8d222 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -350,6 +350,21 @@ public class TableSchemaResolver {
return Option.empty();
}
+ /**
+ * Returns table's latest {@link HoodieSchema} iff table is non-empty (ie
there's at least
+ * a single commit)
+ *
+ * This method differs from {@link #getTableAvroSchema(boolean)} in that it
won't fallback
+ * to use table's schema used at creation
+ */
+ public Option<HoodieSchema> getTableSchemaFromLatestCommit(boolean
includeMetadataFields) {
+ if (metaClient.isTimelineNonEmpty()) {
+ return getTableAvroSchemaInternal(includeMetadataFields,
Option.empty()).map(HoodieSchema::fromAvroSchema);
+ }
+
+ return Option.empty();
+ }
+
/**
* Read schema from a data file from the last compaction commit done.
*
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 245900a11f5f..ab57ebb4bea5 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -63,7 +63,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.schema.HoodieSchema;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -122,7 +121,6 @@ import static
org.apache.hudi.avro.HoodieAvroWrapperUtils.wrapValueIntoAvro;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -941,32 +939,6 @@ public class TestHoodieAvroUtils {
assertEquals(decfield, after);
}
- @Test
- void testHasDecimalField() {
- assertTrue(HoodieAvroUtils.hasDecimalField(new
Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD)));
- assertFalse(HoodieAvroUtils.hasDecimalField(new
Schema.Parser().parse(EVOLVED_SCHEMA)));
- assertFalse(HoodieAvroUtils.hasDecimalField(new
Schema.Parser().parse(SCHEMA_WITH_NON_NULLABLE_FIELD)));
-
assertTrue(HoodieAvroUtils.hasDecimalField(HoodieTestDataGenerator.AVRO_SCHEMA));
-
assertTrue(HoodieAvroUtils.hasDecimalField(HoodieTestDataGenerator.AVRO_TRIP_ENCODED_DECIMAL_SCHEMA));
- Schema recordWithMapAndArray =
Schema.createRecord("recordWithMapAndArray", null, null, false,
- Arrays.asList(new Schema.Field("mapfield",
Schema.createMap(Schema.create(Schema.Type.INT)), null, null),
- new Schema.Field("arrayfield",
Schema.createArray(Schema.create(Schema.Type.INT)), null, null)
- ));
- assertFalse(HoodieAvroUtils.hasDecimalField(recordWithMapAndArray));
- Schema recordWithDecMapAndArray =
Schema.createRecord("recordWithDecMapAndArray", null, null, false,
- Arrays.asList(new Schema.Field("mapfield",
-
Schema.createMap(LogicalTypes.decimal(10,6).addToSchema(Schema.create(Schema.Type.BYTES))),
null, null),
- new Schema.Field("arrayfield",
Schema.createArray(Schema.create(Schema.Type.INT)), null, null)
- ));
- assertTrue(HoodieAvroUtils.hasDecimalField(recordWithDecMapAndArray));
- Schema recordWithMapAndDecArray =
Schema.createRecord("recordWithMapAndDecArray", null, null, false,
- Arrays.asList(new Schema.Field("mapfield",
- Schema.createMap(Schema.create(Schema.Type.INT)), null, null), new
Schema.Field("arrayfield",
-
Schema.createArray(LogicalTypes.decimal(10,6).addToSchema(Schema.create(Schema.Type.BYTES))),
null, null)
- ));
- assertTrue(HoodieAvroUtils.hasDecimalField(recordWithMapAndDecArray));
- }
-
@Test
void testCreateFullName() {
String result = HoodieAvroUtils.createFullName(new
ArrayDeque<>(Arrays.asList("a", "b", "c")));
@@ -1005,33 +977,6 @@ public class TestHoodieAvroUtils {
}
}
- @Test
- void testHasListOrMapField() {
- Schema nestedList = Schema.createRecord("nestedList", null, null, false,
Arrays.asList(
- new Schema.Field("intField", Schema.create(Schema.Type.INT), null,
null),
- new Schema.Field("nested", Schema.createRecord("nestedSchema", null,
null, false, Collections.singletonList(
- new Schema.Field("listField",
Schema.createArray(Schema.create(Schema.Type.INT)), null, null)
- )), null, null)
- ));
- Schema nestedMap = Schema.createRecord("nestedMap", null, null, false,
Arrays.asList(
- new Schema.Field("intField", Schema.create(Schema.Type.INT), null,
null),
- new Schema.Field("nested",
Schema.createUnion(Schema.create(Schema.Type.NULL),
- Schema.createRecord("nestedSchema", null, null, false,
- Collections.singletonList(new Schema.Field("mapField",
Schema.createMap(Schema.create(Schema.Type.INT)), null, null)
- ))), null, null)
- ));
- assertTrue(HoodieAvroUtils.hasListOrMapField(nestedList));
- assertTrue(HoodieAvroUtils.hasListOrMapField(nestedMap));
- assertFalse(HoodieAvroUtils.hasListOrMapField(new
Schema.Parser().parse(EXAMPLE_SCHEMA)));
- }
-
- @Test
- void testHasSmallPrecisionDecimalField() {
- assertTrue(HoodieAvroUtils.hasSmallPrecisionDecimalField(new
Schema.Parser().parse(SCHEMA_WITH_DECIMAL_FIELD)));
- assertFalse(HoodieAvroUtils.hasSmallPrecisionDecimalField(new
Schema.Parser().parse(SCHEMA_WITH_AVRO_TYPES_STR)));
- assertFalse(HoodieAvroUtils.hasSmallPrecisionDecimalField(new
Schema.Parser().parse(EXAMPLE_SCHEMA)));
- }
-
public static Stream<Arguments> getSchemaForFieldParams() {
Object[][] data =
new Object[][] {
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
index b9c91249a8f2..e29ee000f766 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaUtils.java
@@ -19,6 +19,7 @@
package org.apache.hudi.common.schema;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
@@ -99,6 +100,15 @@ public class TestHoodieSchemaUtils {
+
"{\"name\":\"localTimestampMicrosField\",\"type\":\"long\",\"logicalType\":\"local-timestamp-micros\"}"
+ "]}";
+ private static final String SCHEMA_WITH_NON_NULLABLE_FIELD =
+ "{\"type\": \"record\",\"name\": \"testrec3\",\"fields\": [ "
+ + "{\"name\": \"timestamp\",\"type\": \"double\"},{\"name\":
\"_row_key\", \"type\": \"string\"},"
+ + "{\"name\": \"non_pii_col\", \"type\": \"string\"},"
+ + "{\"name\": \"pii_col\", \"type\": \"string\",
\"column_category\": \"user_profile\"},"
+ + "{\"name\": \"nullable_field\",\"type\": [\"null\"
,\"string\"],\"default\": null},"
+ + "{\"name\": \"non_nullable_field_wo_default\",\"type\":
\"string\"},"
+ + "{\"name\": \"non_nullable_field_with_default\",\"type\":
\"string\", \"default\": \"dummy\"}]}";
+
private static String SCHEMA_WITH_NESTED_FIELD_LARGE_STR =
"{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
+ "{\"name\":\"firstname\",\"type\":\"string\"},"
+ "{\"name\":\"lastname\",\"type\":\"string\"},"
@@ -106,6 +116,11 @@ public class TestHoodieSchemaUtils {
+
"{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":["
+ "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\":
null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\":
null}]}}]}";
+ private static final String SCHEMA_WITH_DECIMAL_FIELD =
"{\"type\":\"record\",\"name\":\"record\",\"fields\":["
+ + "{\"name\":\"key_col\",\"type\":[\"null\",\"int\"],\"default\":null},"
+ + "{\"name\":\"decimal_col\",\"type\":[\"null\","
+ +
"{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":8,\"scale\":4}],\"default\":null}]}";
+
private static HoodieSchema SCHEMA_WITH_NESTED_FIELD_LARGE =
HoodieSchema.parse(SCHEMA_WITH_NESTED_FIELD_LARGE_STR);
@Test
@@ -1539,4 +1554,38 @@ public class TestHoodieSchemaUtils {
assertTrue(result instanceof LocalDate);
assertEquals(LocalDate.of(2023, 1, 1), result);
}
+
+ @Test
+ void testHasDecimalField() {
+
assertTrue(HoodieSchemaUtils.hasDecimalField(HoodieSchema.parse(SCHEMA_WITH_DECIMAL_FIELD)));
+
assertFalse(HoodieSchemaUtils.hasDecimalField(HoodieSchema.parse(EVOLVED_SCHEMA)));
+
assertFalse(HoodieSchemaUtils.hasDecimalField(HoodieSchema.parse(SCHEMA_WITH_NON_NULLABLE_FIELD)));
+
assertTrue(HoodieSchemaUtils.hasDecimalField(HoodieTestDataGenerator.HOODIE_SCHEMA));
+
assertTrue(HoodieSchemaUtils.hasDecimalField(HoodieTestDataGenerator.HOODIE_TRIP_ENCODED_DECIMAL_SCHEMA));
+ HoodieSchema recordWithMapAndArray =
HoodieSchema.createRecord("recordWithMapAndArray", null, null, false,
+ Arrays.asList(
+ HoodieSchemaField.of("mapfield",
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT)), null, null),
+ HoodieSchemaField.of("arrayfield",
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.INT)), null, null)
+ ));
+ assertFalse(HoodieSchemaUtils.hasDecimalField(recordWithMapAndArray));
+ HoodieSchema recordWithDecMapAndArray =
HoodieSchema.createRecord("recordWithDecMapAndArray", null, null, false,
+ Arrays.asList(
+ HoodieSchemaField.of("mapfield",
HoodieSchema.createMap(HoodieSchema.createDecimal(10, 6)), null, null),
+ HoodieSchemaField.of("arrayfield",
HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.INT)), null, null)
+ ));
+ assertTrue(HoodieSchemaUtils.hasDecimalField(recordWithDecMapAndArray));
+ HoodieSchema recordWithMapAndDecArray =
HoodieSchema.createRecord("recordWithMapAndDecArray", null, null, false,
+ Arrays.asList(
+ HoodieSchemaField.of("mapfield",
HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT)), null, null),
+ HoodieSchemaField.of("arrayfield",
HoodieSchema.createArray(HoodieSchema.createDecimal(10, 6)), null, null)
+ ));
+ assertTrue(HoodieSchemaUtils.hasDecimalField(recordWithMapAndDecArray));
+ }
+
+ @Test
+ void testHasSmallPrecisionDecimalField() {
+
assertTrue(HoodieSchemaUtils.hasSmallPrecisionDecimalField(HoodieSchema.parse(SCHEMA_WITH_DECIMAL_FIELD)));
+
assertFalse(HoodieSchemaUtils.hasSmallPrecisionDecimalField(HoodieSchema.parse(SCHEMA_WITH_AVRO_TYPES_STR)));
+
assertFalse(HoodieSchemaUtils.hasSmallPrecisionDecimalField(HoodieSchema.parse(EXAMPLE_SCHEMA)));
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 0fa61f3f5e9d..6a4eeb71400d 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -240,11 +240,13 @@ public class HoodieTestDataGenerator implements
AutoCloseable {
public static final HoodieSchema HOODIE_SCHEMA =
HoodieSchema.fromAvroSchema(AVRO_SCHEMA);
public static final Schema AVRO_SCHEMA_WITH_SPECIFIC_COLUMNS = new
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA_WITH_PAYLOAD_SPECIFIC_COLS);
public static final Schema NESTED_AVRO_SCHEMA = new
Schema.Parser().parse(TRIP_NESTED_EXAMPLE_SCHEMA);
+ public static final HoodieSchema NESTED_SCHEMA =
HoodieSchema.fromAvroSchema(NESTED_AVRO_SCHEMA);
public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
public static final HoodieSchema HOODIE_SCHEMA_WITH_METADATA_FIELDS =
HoodieSchema.fromAvroSchema(AVRO_SCHEMA_WITH_METADATA_FIELDS);
public static final Schema AVRO_SHORT_TRIP_SCHEMA = new
Schema.Parser().parse(SHORT_TRIP_SCHEMA);
public static final Schema AVRO_TRIP_ENCODED_DECIMAL_SCHEMA = new
Schema.Parser().parse(TRIP_ENCODED_DECIMAL_SCHEMA);
+ public static final HoodieSchema HOODIE_TRIP_ENCODED_DECIMAL_SCHEMA =
HoodieSchema.parse(TRIP_ENCODED_DECIMAL_SCHEMA);
public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA = new
Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA);
public static final HoodieSchema HOODIE_SCHEMA_TRIP_LOGICAL_TYPES_SCHEMA =
HoodieSchema.parse(TRIP_LOGICAL_TYPES_SCHEMA);
public static final Schema AVRO_TRIP_LOGICAL_TYPES_SCHEMA_V6 = new
Schema.Parser().parse(TRIP_LOGICAL_TYPES_SCHEMA_V6);
diff --git
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala
index d64be904101c..5bfbca63a04b 100644
---
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala
+++
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala
@@ -20,11 +20,11 @@ package org.apache.hudi.integ.testsuite.dag.nodes
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
HoodieSparkUtils}
import org.apache.hudi.client.WriteStatus
+import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
-import org.apache.avro.Schema
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SaveMode
import org.slf4j.LoggerFactory
@@ -58,7 +58,7 @@ class SparkDeleteNode(dagNodeConfig: Config) extends
DagNode[RDD[WriteStatus]] {
val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" +
batchIdRecords.getKey()
val avroDf =
context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName",
"testNamespace", false,
- org.apache.hudi.common.util.Option.of(new
Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
+
org.apache.hudi.common.util.Option.of(HoodieSchema.parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD,
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
diff --git
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
index e5ada9363928..f4dd17f383f2 100644
---
a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
+++
b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala
@@ -20,6 +20,7 @@ package org.apache.hudi.integ.testsuite.dag.nodes
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions,
HoodieSparkUtils}
import org.apache.hudi.client.WriteStatus
+import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig}
@@ -61,7 +62,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends
DagNode[RDD[WriteStatus]] {
val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" +
batchIdRecords.getKey()
val avroDf =
context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName",
"testNamespace", false,
- org.apache.hudi.common.util.Option.of(new
Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
+
org.apache.hudi.common.util.Option.of(HoodieSchema.parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD,
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index a1575af230ab..25151cda0008 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -115,8 +115,7 @@ object HoodieCreateRecordUtils {
recordType match {
case HoodieRecord.HoodieRecordType.AVRO =>
// avroRecords will contain meta fields when isPrepped is true.
- val avroRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df,
recordName, recordNameSpace,
- Some(writerSchema.getAvroSchema))
+ val avroRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df,
recordName, recordNameSpace, Some(writerSchema))
avroRecords.mapPartitions(it => {
val sparkPartitionId = TaskContext.getPartitionId()
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
index d3e7f1b22806..0198beaf2b58 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkUtils.scala
@@ -19,6 +19,7 @@
package org.apache.hudi
import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.schema.HoodieSchema
import org.apache.hudi.testutils.DataSourceTestUtils
import org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest
@@ -97,7 +98,7 @@ class TestHoodieSparkUtils {
val df1 =
spark.createDataFrame(spark.sparkContext.parallelize(recordsSeq), structType)
var genRecRDD = HoodieSparkUtils.createRdd(df1, "test_struct_name",
"test_namespace", true,
- org.apache.hudi.common.util.Option.of(schema.toAvroSchema))
+ org.apache.hudi.common.util.Option.of(schema))
genRecRDD.collect()
val evolSchema = DataSourceTestUtils.getStructTypeExampleEvolvedSchema
@@ -105,13 +106,13 @@ class TestHoodieSparkUtils {
recordsSeq = convertRowListToSeq(records)
genRecRDD = HoodieSparkUtils.createRdd(df1, "test_struct_name",
"test_namespace", true,
- org.apache.hudi.common.util.Option.of(evolSchema.toAvroSchema))
+ org.apache.hudi.common.util.Option.of(evolSchema))
genRecRDD.collect()
// pass in evolved schema but with records serialized with old schema.
should be able to convert with out any exception.
// Before https://github.com/apache/hudi/pull/2927, this will throw
exception.
genRecRDD = HoodieSparkUtils.createRdd(df1, "test_struct_name",
"test_namespace", true,
- org.apache.hudi.common.util.Option.of(evolSchema.toAvroSchema))
+ org.apache.hudi.common.util.Option.of(evolSchema))
val genRecs = genRecRDD.collect()
// if this succeeds w/o throwing any exception, test succeeded.
assertEquals(genRecs.size, 5)
@@ -127,29 +128,29 @@ class TestHoodieSparkUtils {
val innerStruct1 = new
StructType().add("innerKey","string",false).add("innerValue", "long", true)
val structType1 = new StructType().add("key", "string", false)
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct1,true)
- val schema1 =
AvroConversionUtils.convertStructTypeToAvroSchema(structType1,
"test_struct_name", "test_namespace")
+ val schema1 =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType1,
"test_struct_name", "test_namespace")
val records1 = Seq(Row("key1", Row("innerKey1_1", 1L), Row("innerKey1_2",
2L)))
val df1 = spark.createDataFrame(spark.sparkContext.parallelize(records1),
structType1)
val genRecRDD1 = HoodieSparkUtils.createRdd(df1, "test_struct_name",
"test_namespace", true,
org.apache.hudi.common.util.Option.of(schema1))
- assert(schema1.equals(genRecRDD1.collect()(0).getSchema))
+ assert(schema1.toAvroSchema.equals(genRecRDD1.collect()(0).getSchema))
// create schema2 which has one addition column at the root level compared
to schema1
val structType2 = new StructType().add("key", "string", false)
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct1,true)
.add("nullableInnerStruct2",innerStruct1,true)
- val schema2 =
AvroConversionUtils.convertStructTypeToAvroSchema(structType2,
"test_struct_name", "test_namespace")
+ val schema2 =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType2,
"test_struct_name", "test_namespace")
val records2 = Seq(Row("key2", Row("innerKey2_1", 2L), Row("innerKey2_2",
2L), Row("innerKey2_3", 2L)))
val df2 = spark.createDataFrame(spark.sparkContext.parallelize(records2),
structType2)
val genRecRDD2 = HoodieSparkUtils.createRdd(df2, "test_struct_name",
"test_namespace", true,
org.apache.hudi.common.util.Option.of(schema2))
- assert(schema2.equals(genRecRDD2.collect()(0).getSchema))
+ assert(schema2.toAvroSchema.equals(genRecRDD2.collect()(0).getSchema))
// send records1 with schema2. should succeed since the new column is
nullable.
val genRecRDD3 = HoodieSparkUtils.createRdd(df1, "test_struct_name",
"test_namespace", true,
org.apache.hudi.common.util.Option.of(schema2))
- assert(genRecRDD3.collect()(0).getSchema.equals(schema2))
+ assert(genRecRDD3.collect()(0).getSchema.equals(schema2.toAvroSchema))
genRecRDD3.foreach(entry => assertNull(entry.get("nullableInnerStruct2")))
val innerStruct3 = new
StructType().add("innerKey","string",false).add("innerValue", "long", true)
@@ -159,17 +160,17 @@ class TestHoodieSparkUtils {
val structType4 = new StructType().add("key", "string", false)
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct3,true)
- val schema4 =
AvroConversionUtils.convertStructTypeToAvroSchema(structType4,
"test_struct_name", "test_namespace")
+ val schema4 =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType4,
"test_struct_name", "test_namespace")
val records4 = Seq(Row("key2", Row("innerKey2_1", 2L), Row("innerKey2_2",
2L, "new_nested_col_val1")))
val df4 = spark.createDataFrame(spark.sparkContext.parallelize(records4),
structType4)
val genRecRDD4 = HoodieSparkUtils.createRdd(df4, "test_struct_name",
"test_namespace", true,
org.apache.hudi.common.util.Option.of(schema4))
- assert(schema4.equals(genRecRDD4.collect()(0).getSchema))
+ assert(schema4.toAvroSchema.equals(genRecRDD4.collect()(0).getSchema))
// convert batch 1 with schema4. should succeed.
val genRecRDD5 = HoodieSparkUtils.createRdd(df1, "test_struct_name",
"test_namespace", true,
org.apache.hudi.common.util.Option.of(schema4))
- assert(schema4.equals(genRecRDD4.collect()(0).getSchema))
+ assert(schema4.toAvroSchema.equals(genRecRDD4.collect()(0).getSchema))
val genRec = genRecRDD5.collect()(0)
val nestedRec : GenericRecord =
genRec.get("nullableInnerStruct").asInstanceOf[GenericRecord]
assertNull(nestedRec.get("new_nested_col"))
@@ -182,7 +183,7 @@ class TestHoodieSparkUtils {
val structType6 = new StructType().add("key", "string", false)
.add("nonNullableInnerStruct",innerStruct1,false).add("nullableInnerStruct",innerStruct4,true)
- val schema6 =
AvroConversionUtils.convertStructTypeToAvroSchema(structType6,
"test_struct_name", "test_namespace")
+ val schema6 =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType6,
"test_struct_name", "test_namespace")
// convert batch 1 with schema5. should fail since the missed out column
is not nullable.
try {
val genRecRDD6 = HoodieSparkUtils.createRdd(df1, "test_struct_name",
"test_namespace", true,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
index bb640cec019e..dfd97349a8ba 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/AvroSerDerBenchmark.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.benchmark
-import org.apache.hudi.{AvroConversionUtils, HoodieSparkUtils}
+import org.apache.hudi.{AvroConversionUtils, HoodieSchemaConversionUtils,
HoodieSparkUtils}
import org.apache.spark.hudi.benchmark.{HoodieBenchmark, HoodieBenchmarkBase}
import org.apache.spark.sql.{DataFrame, SparkSession}
@@ -55,9 +55,9 @@ object AvroSerDerBenchmark extends HoodieBenchmarkBase {
val benchmark = new HoodieBenchmark(s"perf avro serializer for hoodie",
50000000)
benchmark.addCase("serialize internalRow to avro Record") { _ =>
val df = getDataFrame(50000000)
- val avroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my")
- spark.sparkContext.getConf.registerAvroSchemas(avroSchema)
- HoodieSparkUtils.createRdd(df,"record", "my",
Some(avroSchema)).foreach(f => f)
+ val schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(df.schema,
"record", "my")
+ spark.sparkContext.getConf.registerAvroSchemas(schema.toAvroSchema)
+ HoodieSparkUtils.createRdd(df,"record", "my", Some(schema)).foreach(f =>
f)
}
benchmark.run()
}
@@ -73,11 +73,11 @@ object AvroSerDerBenchmark extends HoodieBenchmarkBase {
val benchmark = new HoodieBenchmark(s"perf avro deserializer for hoodie",
10000000)
val df = getDataFrame(10000000)
val sparkSchema = df.schema
- val avroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my")
- val testRdd = HoodieSparkUtils.createRdd(df,"record", "my",
Some(avroSchema))
+ val schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(df.schema,
"record", "my")
+ val testRdd = HoodieSparkUtils.createRdd(df,"record", "my", Some(schema))
testRdd.cache()
testRdd.foreach(f => f)
- spark.sparkContext.getConf.registerAvroSchemas(avroSchema)
+ spark.sparkContext.getConf.registerAvroSchemas(schema.toAvroSchema)
benchmark.addCase("deserialize avro Record to internalRow") { _ =>
testRdd.mapPartitions { iter =>
val schema =
AvroConversionUtils.convertStructTypeToAvroSchema(sparkSchema, "record", "my")
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/CreateHandleBenchmark.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/CreateHandleBenchmark.scala
index 75e7bd02025f..2881ff145725 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/CreateHandleBenchmark.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/execution/benchmark/CreateHandleBenchmark.scala
@@ -18,8 +18,7 @@
package org.apache.spark.sql.execution.benchmark
-import org.apache.hudi.AvroConversionUtils
-import org.apache.hudi.HoodieSparkUtils
+import org.apache.hudi.{AvroConversionUtils, HoodieSchemaConversionUtils,
HoodieSparkUtils}
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
@@ -117,22 +116,22 @@ object CreateHandleBenchmark extends HoodieBenchmarkBase {
private def createHandleBenchmark: Unit = {
val benchmark = new HoodieBenchmark(s"perf create handle for hoodie",
10000)
val df = getDataFrame(100000)
- val avroSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, "record", "my")
- spark.sparkContext.getConf.registerAvroSchemas(avroSchema)
+ val schema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(df.schema,
"record", "my")
+ spark.sparkContext.getConf.registerAvroSchemas(schema.toAvroSchema)
df.write.format("hudi").option(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
"key")
.option(HoodieMetadataConfig.ENABLE.key(), "false")
.option(HoodieTableConfig.NAME.key(),
"tbl_name").mode(SaveMode.Overwrite).save("/tmp/sample_test_table")
val dummpProps = new Properties()
val avroRecords: java.util.List[HoodieRecord[_]] =
HoodieSparkUtils.createRdd(df, "struct_name", "name_space",
- Some(avroSchema)).mapPartitions(
+ Some(schema)).mapPartitions(
it => {
it.map { genRec =>
val hoodieKey = new HoodieKey(genRec.get("key").toString, "")
HoodieRecordUtils.createHoodieRecord(genRec, 0L, hoodieKey,
classOf[DefaultHoodieRecordPayload].getName, false, null)
}
}).toJavaRDD().collect().stream().map[HoodieRecord[_]](hoodieRec => {
-
hoodieRec.asInstanceOf[HoodieAvroIndexedRecord].toIndexedRecord(avroSchema,
dummpProps)
+
hoodieRec.asInstanceOf[HoodieAvroIndexedRecord].toIndexedRecord(schema.toAvroSchema,
dummpProps)
hoodieRec
}).collect(Collectors.toList[HoodieRecord[_]])
@@ -140,7 +139,7 @@ object CreateHandleBenchmark extends HoodieBenchmarkBase {
val props = new Properties()
props.setProperty(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), "key")
val writeConfig =
HoodieWriteConfig.newBuilder().withPath("/tmp/sample_test_table").withPreCombineField("col1")
- .withSchema(avroSchema.toString)
+ .withSchema(schema.toString)
.withMarkersType(MarkerType.DIRECT.name())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
.withProps(props).build()
@@ -151,7 +150,7 @@ object CreateHandleBenchmark extends HoodieBenchmarkBase {
val createHandle = new HoodieCreateHandle(writeConfig, "000000001",
hoodieTable, "", UUID.randomUUID().toString, new LocalTaskContextSupplier())
avroRecords.forEach(record => {
val newAvroRec = new HoodieAvroIndexedRecord(record.getKey,
record.getData.asInstanceOf[IndexedRecord], 0L, record.getOperation)
- createHandle.write(newAvroRec,
HoodieSchema.fromAvroSchema(avroSchema), writeConfig.getProps)
+ createHandle.write(newAvroRec, schema, writeConfig.getProps)
})
createHandle.close()
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 6622190cf571..e330b821cc51 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -18,7 +18,7 @@
package org.apache.hudi.utilities;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -226,12 +226,12 @@ public class UtilHelpers {
public static StructType getSourceSchema(SchemaProvider schemaProvider) {
if (schemaProvider != null && schemaProvider.getSourceHoodieSchema() !=
null && schemaProvider.getSourceHoodieSchema() != InputBatch.NULL_SCHEMA) {
- return
AvroConversionUtils.convertAvroSchemaToStructType(schemaProvider.getSourceHoodieSchema().toAvroSchema());
+ return
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(schemaProvider.getSourceHoodieSchema());
}
return null;
}
- public static Option<Transformer> createTransformer(Option<List<String>>
classNamesOpt, Supplier<Option<Schema>> sourceSchemaSupplier,
+ public static Option<Transformer> createTransformer(Option<List<String>>
classNamesOpt, Supplier<Option<HoodieSchema>> sourceSchemaSupplier,
boolean
isErrorTableWriterEnabled) throws IOException {
try {
@@ -535,7 +535,7 @@ public class UtilHelpers {
structType =
SparkAdapterSupport$.MODULE$.sparkAdapter().getSchemaUtils()
.getSchema(conn, rs, dialect, false, false);
}
- return
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(structType,
table, "hoodie." + table));
+ return
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType, table,
"hoodie." + table);
}
}
} catch (HoodieException e) {
@@ -592,15 +592,15 @@ public class UtilHelpers {
return wrapSchemaProviderWithPostProcessor(rowSchemaProvider, cfg, jssc,
null);
}
- public static Option<Schema> getLatestTableSchema(JavaSparkContext jssc,
- HoodieStorage storage,
- String basePath,
- HoodieTableMetaClient
tableMetaClient) {
+ public static Option<HoodieSchema> getLatestTableSchema(JavaSparkContext
jssc,
+ HoodieStorage
storage,
+ String basePath,
+
HoodieTableMetaClient tableMetaClient) {
try {
if (FSUtils.isTableExists(basePath, storage)) {
TableSchemaResolver tableSchemaResolver = new
TableSchemaResolver(tableMetaClient);
- return tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false);
+ return tableSchemaResolver.getTableSchemaFromLatestCommit(false);
}
} catch (Exception e) {
LOG.warn("Failed to fetch latest table's schema", e);
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java
index fe9340ad2e7c..6f69fc8a6c37 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SchemaSet.java
@@ -18,7 +18,8 @@
package org.apache.hudi.utilities.schema;
-import org.apache.avro.Schema;
+import org.apache.hudi.common.schema.HoodieSchema;
+
import org.apache.avro.SchemaNormalization;
import java.io.Serializable;
@@ -32,13 +33,13 @@ public class SchemaSet implements Serializable {
private final Set<Long> processedSchema = new HashSet<>();
- public boolean isSchemaPresent(Schema schema) {
- long schemaKey = SchemaNormalization.parsingFingerprint64(schema);
+ public boolean isSchemaPresent(HoodieSchema schema) {
+ long schemaKey =
SchemaNormalization.parsingFingerprint64(schema.toAvroSchema());
return processedSchema.contains(schemaKey);
}
- public void addSchema(Schema schema) {
- long schemaKey = SchemaNormalization.parsingFingerprint64(schema);
+ public void addSchema(HoodieSchema schema) {
+ long schemaKey =
SchemaNormalization.parsingFingerprint64(schema.toAvroSchema());
processedSchema.add(schemaKey);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
index accbd55a8a83..2e08a078daf0 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelectorCommon.java
@@ -18,10 +18,12 @@
package org.apache.hudi.utilities.sources.helpers;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieException;
@@ -35,7 +37,6 @@ import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -290,10 +291,9 @@ public class CloudObjectsSelectorCommon {
if (schemaProviderOption.isPresent()) {
HoodieSchema sourceSchema =
schemaProviderOption.get().getSourceHoodieSchema();
if (sourceSchema != null &&
!sourceSchema.equals(InputBatch.NULL_SCHEMA)) {
- Schema sourceAvroSchema = sourceSchema.toAvroSchema();
- rowSchema =
AvroConversionUtils.convertAvroSchemaToStructType(sourceAvroSchema);
+ rowSchema =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchema);
if (isCoalesceRequired(properties, sourceSchema)) {
- reader = reader.schema(addAliasesToRowSchema(sourceAvroSchema,
rowSchema));
+ reader = reader.schema(addAliasesToRowSchema(sourceSchema,
rowSchema));
} else {
reader = reader.schema(rowSchema);
}
@@ -331,7 +331,7 @@ public class CloudObjectsSelectorCommon {
if (schemaProviderOption.isPresent()) {
HoodieSchema sourceSchema =
schemaProviderOption.get().getSourceHoodieSchema();
if (isCoalesceRequired(properties, sourceSchema)) {
- dataset = spark.createDataFrame(coalesceAliasFields(dataset,
sourceSchema.toAvroSchema()).rdd(), rowSchema);
+ dataset = spark.createDataFrame(coalesceAliasFields(dataset,
sourceSchema).rdd(), rowSchema);
}
}
@@ -352,19 +352,19 @@ public class CloudObjectsSelectorCommon {
private static boolean isCoalesceRequired(TypedProperties properties,
HoodieSchema sourceSchema) {
return getBooleanWithAltKeys(properties,
CloudSourceConfig.SPARK_DATASOURCE_READER_COALESCE_ALIAS_COLUMNS)
&& Objects.nonNull(sourceSchema)
- && hasFieldWithAliases(sourceSchema.toAvroSchema());
+ && hasFieldWithAliases(sourceSchema);
}
/**
- * Recursively checks if an Avro schema or any of its nested fields contain
aliases.
+ * Recursively checks if a schema or any of its nested fields contain
aliases.
*
- * @param schema The Avro schema to check.
+ * @param schema The schema to check.
* @return True if the schema or any of its fields contain aliases, false
otherwise.
*/
- private static boolean hasFieldWithAliases(Schema schema) {
+ private static boolean hasFieldWithAliases(HoodieSchema schema) {
// If the schema is a record, check its fields recursively
if (isNestedRecord(schema)) {
- for (Schema.Field field : getRecordFields(schema)) {
+ for (HoodieSchemaField field : getRecordFields(schema)) {
// Check if the field has aliases
if (!field.aliases().isEmpty()) {
return true;
@@ -379,27 +379,27 @@ public class CloudObjectsSelectorCommon {
return false;
}
- private static StructType addAliasesToRowSchema(Schema avroSchema,
StructType rowSchema) {
+ private static StructType addAliasesToRowSchema(HoodieSchema schema,
StructType rowSchema) {
Map<String, StructField> rowFieldsMap = Arrays.stream(rowSchema.fields())
.collect(Collectors.toMap(StructField::name, Function.identity()));
- StructField[] modifiedFields = getRecordFields(avroSchema).stream()
- .flatMap(avroField -> generateRowFieldsWithAliases(avroField,
rowFieldsMap.get(avroField.name())).stream())
+ StructField[] modifiedFields = getRecordFields(schema).stream()
+ .flatMap(field -> generateRowFieldsWithAliases(field,
rowFieldsMap.get(field.name())).stream())
.toArray(StructField[]::new);
return new StructType(modifiedFields);
}
- private static List<Schema.Field> getRecordFields(Schema schema) {
- if (schema.getType() == Schema.Type.RECORD) {
+ private static List<HoodieSchemaField> getRecordFields(HoodieSchema schema) {
+ if (schema.getType() == HoodieSchemaType.RECORD) {
return schema.getFields();
}
- if (schema.getType() == Schema.Type.UNION) {
+ if (schema.getType() == HoodieSchemaType.UNION) {
return schema.getTypes().stream()
- .filter(subSchema -> subSchema.getType() == Schema.Type.RECORD)
+ .filter(subSchema -> subSchema.getType() == HoodieSchemaType.RECORD)
.findFirst()
- .map(Schema::getFields)
+ .map(HoodieSchema::getFields)
.orElse(Collections.emptyList());
}
@@ -407,35 +407,35 @@ public class CloudObjectsSelectorCommon {
}
/**
- * Generates a list of StructFields with aliases applied based on the
provided Avro field schema.
+ * Generates a list of StructFields with aliases applied based on the
provided field's schema.
* <p>
- * This method processes a given Avro field and its corresponding Spark SQL
StructField, handling
- * nested records and aliases. If the Avro field contains nested records,
the method recursively
- * updates the schema for these records and applies any aliases defined in
the Avro schema.
- * If the Avro field has aliases, they are added as new fields with nullable
set to true and
+ * This method processes a given field and its corresponding Spark SQL
StructField, handling
+ * nested records and aliases. If the field contains nested records, the
method recursively
+ * updates the schema for these records and applies any aliases defined in
the schema.
+ * If the field has aliases, they are added as new fields with nullable set
to true and
* appropriate metadata in the returned list. If no aliases or nesting are
present, the original
* StructField is returned unchanged.
*
- * @param avroField The Avro field schema to process.
- * @param rowField The corresponding Spark SQL StructField to map the Avro
field to.
- * @return A list of StructFields with aliases applied as per the Avro
schema.
+ * @param field The field from the schema to process.
+ * @param rowField The corresponding Spark SQL StructField to map the field
to.
+ * @return A list of StructFields with aliases applied as per the provided
schema.
*/
- private static List<StructField> generateRowFieldsWithAliases(Schema.Field
avroField, StructField rowField) {
+ private static List<StructField>
generateRowFieldsWithAliases(HoodieSchemaField field, StructField rowField) {
List<StructField> fieldList = new ArrayList<>();
// Handle nested records
- if (isNestedRecord(avroField.schema())) {
- StructType updatedSchema = addAliasesToRowSchema(avroField.schema(),
(StructType) rowField.dataType());
+ if (isNestedRecord(field.schema())) {
+ StructType updatedSchema = addAliasesToRowSchema(field.schema(),
(StructType) rowField.dataType());
- if (schemaModifiedOrHasAliases(avroField, updatedSchema, rowField)) {
+ if (schemaModifiedOrHasAliases(field, updatedSchema, rowField)) {
// Add the original field with the updated schema and add aliases if
present
- addFieldWithAliases(fieldList, avroField.name(), updatedSchema,
rowField.metadata(), avroField.aliases());
+ addFieldWithAliases(fieldList, field.name(), updatedSchema,
rowField.metadata(), field.aliases());
} else {
fieldList.add(rowField);
}
- } else if (!avroField.aliases().isEmpty()) {
+ } else if (!field.aliases().isEmpty()) {
// If the field has aliases, add them to the schema
- addFieldWithAliases(fieldList, avroField.name(), rowField.dataType(),
rowField.metadata(), avroField.aliases());
+ addFieldWithAliases(fieldList, field.name(), rowField.dataType(),
rowField.metadata(), field.aliases());
} else {
// No aliases or nesting, return the original field
fieldList.add(rowField);
@@ -448,22 +448,22 @@ public class CloudObjectsSelectorCommon {
aliases.forEach(alias -> fieldList.add(new StructField(alias, dataType,
true, metadata)));
}
- private static Dataset<Row> coalesceAliasFields(Dataset<Row> dataset, Schema
sourceSchema) {
+ private static Dataset<Row> coalesceAliasFields(Dataset<Row> dataset,
HoodieSchema sourceSchema) {
return coalesceNestedAliases(coalesceTopLevelAliases(dataset,
sourceSchema), sourceSchema);
}
/**
* Merges top-level fields with their aliases in the dataset.
* <p>
- * This method goes through the top-level fields in the Avro schema, and for
any field that has aliases,
+ * This method goes through the top-level fields in the schema, and for any
field that has aliases,
* it combines them in the dataset using a coalesce operation. This ensures
that if a field is null,
* the value from its alias is used instead.
*
* @param dataset The dataset to process.
- * @param sourceSchema The Avro schema defining the fields and their aliases.
+ * @param sourceSchema The schema defining the fields and their aliases.
* @return A dataset with fields merged with their aliases.
*/
- private static Dataset<Row> coalesceTopLevelAliases(Dataset<Row> dataset,
Schema sourceSchema) {
+ private static Dataset<Row> coalesceTopLevelAliases(Dataset<Row> dataset,
HoodieSchema sourceSchema) {
return getRecordFields(sourceSchema).stream()
.filter(field -> !field.aliases().isEmpty())
.reduce(dataset,
@@ -482,17 +482,17 @@ public class CloudObjectsSelectorCommon {
/**
* Merges nested fields with their aliases in the dataset.
* <p>
- * This method iterates through the fields of the provided Avro schema and
checks if they represent
+ * This method iterates through the fields of the provided schema and checks
if they represent
* nested records. For each nested record, it verifies if there are any
alias fields present. If
* aliases are found, the method generates a list of nested fields,
coalescing them with their aliases,
* and creates a new column in the dataset with the merged data.
*
* @param dataset The dataset to process.
- * @param sourceSchema The Avro schema defining the structure and aliases of
the data.
+ * @param sourceSchema The schema defining the structure and aliases of the
data.
* @return A dataset with nested fields merged with their aliases.
*/
- private static Dataset<Row> coalesceNestedAliases(Dataset<Row> dataset,
Schema sourceSchema) {
- for (Schema.Field field : getRecordFields(sourceSchema)) {
+ private static Dataset<Row> coalesceNestedAliases(Dataset<Row> dataset,
HoodieSchema sourceSchema) {
+ for (HoodieSchemaField field : getRecordFields(sourceSchema)) {
// check if this is a nested record and contains an alias field within
if (isNestedRecord(field.schema()) &&
hasFieldWithAliases(field.schema())) {
dataset = dataset.withColumn(field.name(),
functions.struct(getNestedFields("", field, dataset)));
@@ -501,32 +501,32 @@ public class CloudObjectsSelectorCommon {
return dataset;
}
- private static Column[] getNestedFields(String parentField, Schema.Field
field, Dataset<Row> dataset) {
+ private static Column[] getNestedFields(String parentField,
HoodieSchemaField field, Dataset<Row> dataset) {
return getRecordFields(field.schema()).stream()
- .map(avroField -> {
+ .map(schemaField -> {
List<Column> columns = new ArrayList<>();
String newParentField = getFullName(parentField, field.name());
- if (isNestedRecord(avroField.schema())) {
+ if (isNestedRecord(schemaField.schema())) {
// if field is nested, recursively fetch nested column
- columns.add(functions.struct(getNestedFields(newParentField,
avroField, dataset)));
+ columns.add(functions.struct(getNestedFields(newParentField,
schemaField, dataset)));
} else {
- columns.add(dataset.col(getFullName(newParentField,
avroField.name())));
+ columns.add(dataset.col(getFullName(newParentField,
schemaField.name())));
}
- avroField.aliases().forEach(alias ->
columns.add(dataset.col(getFullName(newParentField, alias))));
- // if avro field contains aliases, coalesce the column with others
matching the aliases otherwise return actual column
- return avroField.aliases().isEmpty() ? columns.get(0)
- : functions.coalesce(columns.toArray(new
Column[0])).alias(avroField.name());
+ schemaField.aliases().forEach(alias ->
columns.add(dataset.col(getFullName(newParentField, alias))));
+ // if field contains aliases, coalesce the column with others
matching the aliases otherwise return actual column
+ return schemaField.aliases().isEmpty() ? columns.get(0)
+ : functions.coalesce(columns.toArray(new
Column[0])).alias(schemaField.name());
}).toArray(Column[]::new);
}
- private static boolean isNestedRecord(Schema schema) {
- if (schema.getType() == Schema.Type.RECORD) {
+ private static boolean isNestedRecord(HoodieSchema schema) {
+ if (schema.getType() == HoodieSchemaType.RECORD) {
return true;
}
- if (schema.getType() == Schema.Type.UNION) {
+ if (schema.getType() == HoodieSchemaType.UNION) {
return schema.getTypes().stream()
- .anyMatch(subSchema -> subSchema.getType() == Schema.Type.RECORD);
+ .anyMatch(subSchema -> subSchema.getType() ==
HoodieSchemaType.RECORD);
}
return false;
@@ -536,8 +536,8 @@ public class CloudObjectsSelectorCommon {
return namespace.isEmpty() ? fieldName : namespace + "." + fieldName;
}
- private static boolean schemaModifiedOrHasAliases(Schema.Field avroField,
StructType modifiedNestedSchema, StructField rowField) {
- return !modifiedNestedSchema.equals(rowField.dataType()) ||
!avroField.aliases().isEmpty();
+ private static boolean schemaModifiedOrHasAliases(HoodieSchemaField field,
StructType modifiedNestedSchema, StructField rowField) {
+ return !modifiedNestedSchema.equals(rowField.dataType()) ||
!field.aliases().isEmpty();
}
private static Option<String> getPropVal(TypedProperties props,
ConfigProperty<String> configProperty) {
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index 236f8fb1cda8..12e620156c49 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -19,11 +19,10 @@
package org.apache.hudi.utilities.streamer;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.AvroRecordContext;
import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
@@ -31,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.read.DeleteContext;
import org.apache.hudi.common.util.ConfigUtils;
@@ -104,8 +104,8 @@ public class HoodieStreamerUtils {
boolean requiresPayload = isChangingRecords(cfg.operation) &&
!HoodieWriteConfig.isFileGroupReaderBasedMergeHandle(props);
return avroRDDOptional.map(avroRDD -> {
- SerializableSchema avroSchema = new
SerializableSchema(schemaProvider.getTargetHoodieSchema().toAvroSchema());
- SerializableSchema processedAvroSchema = new
SerializableSchema(isDropPartitionColumns(props) ?
HoodieAvroUtils.removeMetadataFields(avroSchema.get()) : avroSchema.get());
+ HoodieSchema targetSchema = schemaProvider.getTargetHoodieSchema();
+ HoodieSchema processedSchema = isDropPartitionColumns(props) ?
HoodieSchemaUtils.removeMetadataFields(targetSchema) : targetSchema;
JavaRDD<Either<HoodieRecord,String>> records;
if (recordType == HoodieRecord.HoodieRecordType.AVRO) {
records = avroRDD.mapPartitions(
@@ -119,8 +119,7 @@ public class HoodieStreamerUtils {
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
}
BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
- HoodieSchema processedHoodieSchema =
HoodieSchema.fromAvroSchema(processedAvroSchema.get());
- DeleteContext deleteContext = new DeleteContext(props,
processedHoodieSchema).withReaderSchema(processedHoodieSchema);
+ DeleteContext deleteContext = new DeleteContext(props,
processedSchema).withReaderSchema(processedSchema);
return new
CloseableMappingIterator<>(ClosableIterator.wrap(genericRecordIterator), genRec
-> {
try {
if (shouldErrorTable) {
@@ -154,10 +153,10 @@ public class HoodieStreamerUtils {
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG,
instantTime);
}
BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
- StructType baseStructType =
AvroConversionUtils.convertAvroSchemaToStructType(processedAvroSchema.get());
- StructType targetStructType = isDropPartitionColumns(props) ?
AvroConversionUtils
-
.convertAvroSchemaToStructType(HoodieAvroUtils.removeFields(processedAvroSchema.get(),
partitionColumns)) : baseStructType;
- HoodieAvroDeserializer deserializer =
SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(HoodieSchema.fromAvroSchema(processedAvroSchema.get()),
baseStructType);
+ StructType baseStructType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(processedSchema);
+ StructType targetStructType = isDropPartitionColumns(props) ?
HoodieSchemaConversionUtils
+
.convertHoodieSchemaToStructType(HoodieSchemaUtils.removeFields(processedSchema,
partitionColumns)) : baseStructType;
+ HoodieAvroDeserializer deserializer =
SparkAdapterSupport$.MODULE$.sparkAdapter().createAvroDeserializer(processedSchema,
baseStructType);
return new CloseableMappingIterator<>(ClosableIterator.wrap(itr),
rec -> {
InternalRow row = (InternalRow)
deserializer.deserialize(rec).get();
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
index d23e91b18a1d..bffe46f637fd 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SourceFormatAdapter.java
@@ -20,11 +20,12 @@
package org.apache.hudi.utilities.streamer;
import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.MercifulJsonConverter;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
@@ -42,7 +43,6 @@ import org.apache.hudi.utilities.sources.helpers.RowConverter;
import org.apache.hudi.utilities.sources.helpers.SanitizationUtils;
import com.google.protobuf.Message;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Column;
@@ -206,7 +206,7 @@ public class SourceFormatAdapter implements Closeable {
// pass in the schema for the Row-to-Avro conversion
// to avoid nullability mismatch between Avro schema and
Row schema
? HoodieSparkUtils.createRdd(rdd,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, true,
-
Option.ofNullable(r.getSchemaProvider().getSourceHoodieSchema().toAvroSchema())
+
Option.ofNullable(r.getSchemaProvider().getSourceHoodieSchema())
).toJavaRDD() : HoodieSparkUtils.createRdd(rdd,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, false,
Option.empty()).toJavaRDD();
})
@@ -255,12 +255,12 @@ public class SourceFormatAdapter implements Closeable {
}
case JSON: {
InputBatch<JavaRDD<String>> r = ((Source<JavaRDD<String>>)
source).fetchNext(lastCheckpoint, sourceLimit);
- Schema sourceSchema =
r.getSchemaProvider().getSourceHoodieSchema().toAvroSchema();
+ HoodieSchema sourceSchema =
r.getSchemaProvider().getSourceHoodieSchema();
// Decimal fields need additional decoding from json generated by
kafka-connect. JSON -> ROW conversion is done through
// a spark library that has not implemented this decoding
if (isFieldNameSanitizingEnabled()
- || (HoodieAvroUtils.hasDecimalField(sourceSchema) && source
instanceof KafkaSource)) {
- StructType dataType =
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
+ || (HoodieSchemaUtils.hasDecimalField(sourceSchema) && source
instanceof KafkaSource)) {
+ StructType dataType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchema);
JavaRDD<Row> rowRDD = transformJsonToRowRdd(r);
if (rowRDD != null) {
Dataset<Row> rowDataset =
source.getSparkSession().createDataFrame(rowRDD, dataType);
@@ -274,7 +274,7 @@ public class SourceFormatAdapter implements Closeable {
// if error table writer is enabled, during spark read
`columnNameOfCorruptRecord` option is configured.
// Any records which spark is unable to read successfully are
transferred to the column
// configured via this option. The column is then used to trigger
error events.
- StructType dataType =
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema)
+ StructType dataType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchema)
.add(new StructField(ERROR_TABLE_CURRUPT_RECORD_COL_NAME,
DataTypes.StringType, true, Metadata.empty()));
StructType nullableStruct = dataType.asNullable();
Option<Dataset<Row>> dataset = r.getBatch().map(rdd ->
source.getSparkSession().read()
@@ -288,7 +288,7 @@ public class SourceFormatAdapter implements Closeable {
eventsDataset,
r.getCheckpointForNextBatch(), r.getSchemaProvider());
} else {
- StructType dataType =
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchema);
+ StructType dataType =
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchema);
return new InputBatch<>(
Option.ofNullable(
r.getBatch().map(rdd ->
HoodieSparkUtils.maybeWrapDataFrameWithException(source.getSparkSession().read().schema(dataType).json(rdd),
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index dc5513d1fdab..2995081d82d3 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -19,15 +19,13 @@
package org.apache.hudi.utilities.streamer;
-import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceUtils;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.HoodieConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSchemaUtils;
import org.apache.hudi.HoodieSparkSqlWriter;
import org.apache.hudi.HoodieSparkUtils;
-import org.apache.hudi.avro.AvroSchemaUtils;
-import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.callback.common.WriteStatusValidator;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
@@ -51,6 +49,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.model.debezium.DebeziumConstants;
import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCompatibility;
import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -118,7 +117,6 @@ import org.apache.hudi.utilities.transform.Transformer;
import com.codahale.metrics.Timer;
import org.apache.avro.Schema;
-import org.apache.avro.SchemaCompatibility;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -148,6 +146,7 @@ import java.util.stream.Collectors;
import scala.Tuple2;
import static
org.apache.hudi.DataSourceUtils.createUserDefinedBulkInsertPartitioner;
+import static
org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordQualifiedName;
import static
org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE;
import static
org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH;
import static
org.apache.hudi.common.table.HoodieTableConfig.URL_ENCODE_PARTITIONING;
@@ -339,7 +338,7 @@ public class StreamSync implements Serializable, Closeable {
Source source = UtilHelpers.createSource(cfg.sourceClassName, props,
hoodieSparkContext.jsc(), sparkSession, metrics, streamContext);
this.formatAdapter = new SourceFormatAdapter(source,
this.errorTableWriter, Option.of(props));
- Supplier<Option<Schema>> schemaSupplier = schemaProvider == null ?
Option::empty : () ->
Option.ofNullable(schemaProvider.getSourceHoodieSchema()).map(HoodieSchema::toAvroSchema);
+ Supplier<Option<HoodieSchema>> schemaSupplier = schemaProvider == null ?
Option::empty : () -> Option.ofNullable(schemaProvider.getSourceHoodieSchema());
this.transformer =
UtilHelpers.createTransformer(Option.ofNullable(cfg.transformerClassNames),
schemaSupplier, this.errorTableWriter.isPresent());
}
@@ -541,18 +540,18 @@ public class StreamSync implements Serializable,
Closeable {
} else {
HoodieSchema newSourceSchema =
inputBatch.getSchemaProvider().getSourceHoodieSchema();
HoodieSchema newTargetSchema =
inputBatch.getSchemaProvider().getTargetHoodieSchema();
- if ((newSourceSchema != null &&
!processedSchema.isSchemaPresent(newSourceSchema.toAvroSchema()))
- || (newTargetSchema != null &&
!processedSchema.isSchemaPresent(newTargetSchema.toAvroSchema()))) {
+ if ((newSourceSchema != null &&
!processedSchema.isSchemaPresent(newSourceSchema))
+ || (newTargetSchema != null &&
!processedSchema.isSchemaPresent(newTargetSchema))) {
String sourceStr = newSourceSchema == null ? NULL_PLACEHOLDER :
newSourceSchema.toString(true);
String targetStr = newTargetSchema == null ? NULL_PLACEHOLDER :
newTargetSchema.toString(true);
LOG.info("Seeing new schema. Source: {}, Target: {}", sourceStr,
targetStr);
// We need to recreate write client with new schema and register them.
reInitWriteClient(newSourceSchema, newTargetSchema,
inputBatch.getBatch(), metaClient);
if (newSourceSchema != null) {
- processedSchema.addSchema(newSourceSchema.toAvroSchema());
+ processedSchema.addSchema(newSourceSchema);
}
if (newTargetSchema != null) {
- processedSchema.addSchema(newTargetSchema.toAvroSchema());
+ processedSchema.addSchema(newTargetSchema);
}
}
}
@@ -714,7 +713,7 @@ public class StreamSync implements Serializable, Closeable {
rowDataset -> {
Tuple2<RDD<GenericRecord>, RDD<String>> safeCreateRDDs =
HoodieSparkUtils.safeCreateRDD(rowDataset,
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE,
reconcileSchema,
-
Option.of(finalSchemaProvider.getTargetHoodieSchema().toAvroSchema()));
+ Option.of(finalSchemaProvider.getTargetHoodieSchema()));
errorTableWriter.get().addErrorEvents(safeCreateRDDs._2().toJavaRDD()
.map(evStr -> new ErrorEvent<>(evStr,
ErrorEvent.ErrorReason.AVRO_DESERIALIZATION_FAILURE)));
@@ -722,14 +721,14 @@ public class StreamSync implements Serializable,
Closeable {
});
} else {
avroRDDOptional = transformed.map(
- rowDataset -> getTransformedRDD(rowDataset, reconcileSchema,
finalSchemaProvider.getTargetHoodieSchema().toAvroSchema()));
+ rowDataset -> getTransformedRDD(rowDataset, reconcileSchema,
finalSchemaProvider.getTargetHoodieSchema()));
}
}
} else {
// Deduce proper target (writer's) schema for the input dataset,
reconciling its
// schema w/ the table's one
HoodieSchema incomingSchema = transformed.map(df ->
-
HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
AvroSchemaUtils.getAvroRecordQualifiedName(cfg.targetTableName))))
+
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(df.schema(),
getRecordQualifiedName(cfg.targetTableName)))
.orElseGet(dataAndCheckpoint.getSchemaProvider()::getTargetHoodieSchema);
schemaProvider = getDeducedSchemaProvider(incomingSchema,
dataAndCheckpoint.getSchemaProvider(), metaClient);
@@ -738,7 +737,7 @@ public class StreamSync implements Serializable, Closeable {
} else {
// Rewrite transformed records into the expected target schema
SchemaProvider finalSchemaProvider = schemaProvider;
- avroRDDOptional = transformed.map(t -> getTransformedRDD(t,
reconcileSchema, finalSchemaProvider.getTargetHoodieSchema().toAvroSchema()));
+ avroRDDOptional = transformed.map(t -> getTransformedRDD(t,
reconcileSchema, finalSchemaProvider.getTargetHoodieSchema()));
}
}
} else {
@@ -792,7 +791,7 @@ public class StreamSync implements Serializable, Closeable {
*/
@VisibleForTesting
SchemaProvider getDeducedSchemaProvider(HoodieSchema incomingSchema,
SchemaProvider sourceSchemaProvider, HoodieTableMetaClient metaClient) {
- Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), storage,
cfg.targetBasePath, metaClient);
+ Option<HoodieSchema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), storage,
cfg.targetBasePath, metaClient);
Option<InternalSchema> internalSchemaOpt =
HoodieConversionUtils.toJavaOption(
HoodieSchemaUtils.getLatestTableInternalSchema(
HoodieStreamer.Config.getProps(conf, cfg), metaClient));
@@ -801,7 +800,7 @@ public class StreamSync implements Serializable, Closeable {
HoodieSchema targetSchema = HoodieSchemaUtils.deduceWriterSchema(
incomingSchema == null ?
HoodieSchema.create(HoodieSchemaType.NULL) :
org.apache.hudi.common.schema.HoodieSchemaUtils.removeMetadataFields(incomingSchema),
- latestTableSchemaOpt.map(HoodieSchema::fromAvroSchema),
+ latestTableSchemaOpt,
internalSchemaOpt,
props);
@@ -810,7 +809,7 @@ public class StreamSync implements Serializable, Closeable {
new SimpleSchemaProvider(hoodieSparkContext.jsc(),
targetSchema, props));
}
- private JavaRDD<GenericRecord> getTransformedRDD(Dataset<Row> rowDataset,
boolean reconcileSchema, Schema readerSchema) {
+ private JavaRDD<GenericRecord> getTransformedRDD(Dataset<Row> rowDataset,
boolean reconcileSchema, HoodieSchema readerSchema) {
return HoodieSparkUtils.createRdd(rowDataset, HOODIE_RECORD_STRUCT_NAME,
HOODIE_RECORD_NAMESPACE, reconcileSchema,
Option.ofNullable(readerSchema)).toJavaRDD();
}
@@ -1089,7 +1088,7 @@ public class StreamSync implements Serializable,
Closeable {
private void reInitWriteClient(HoodieSchema sourceSchema, HoodieSchema
targetSchema, Option<JavaRDD<HoodieRecord>> recordsOpt, HoodieTableMetaClient
metaClient) throws IOException {
LOG.info("Setting up new Hoodie Write Client");
if (HoodieStreamerUtils.isDropPartitionColumns(props)) {
- targetSchema =
HoodieSchema.fromAvroSchema(HoodieAvroUtils.removeFields(targetSchema.toAvroSchema(),
HoodieStreamerUtils.getPartitionColumns(props)));
+ targetSchema =
org.apache.hudi.common.schema.HoodieSchemaUtils.removeFields(targetSchema,
HoodieStreamerUtils.getPartitionColumns(props));
}
final Pair<HoodieWriteConfig, HoodieSchema> initialWriteConfigAndSchema =
getHoodieClientConfigAndWriterSchema(targetSchema, true, metaClient);
final HoodieWriteConfig initialWriteConfig =
initialWriteConfigAndSchema.getLeft();
@@ -1223,16 +1222,15 @@ public class StreamSync implements Serializable,
Closeable {
HoodieSchema newWriteSchema = targetSchema;
try {
// check if targetSchema is equal to NULL schema
- Schema nullSchema = InputBatch.NULL_SCHEMA.toAvroSchema();
- if (targetSchema == null ||
(SchemaCompatibility.checkReaderWriterCompatibility(targetSchema.toAvroSchema(),
nullSchema).getType() == SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE
- && SchemaCompatibility.checkReaderWriterCompatibility(nullSchema,
targetSchema.toAvroSchema()).getType() ==
SchemaCompatibility.SchemaCompatibilityType.COMPATIBLE)) {
+ HoodieSchema nullSchema = InputBatch.NULL_SCHEMA;
+ if (targetSchema == null ||
(HoodieSchemaCompatibility.areSchemasCompatible(targetSchema, nullSchema) &&
HoodieSchemaCompatibility.areSchemasCompatible(nullSchema, targetSchema))) {
// target schema is null. fetch schema from commit metadata and use it
int totalCompleted =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants();
if (totalCompleted > 0) {
TableSchemaResolver schemaResolver = new
TableSchemaResolver(metaClient);
- Option<Schema> tableSchema =
schemaResolver.getTableAvroSchemaIfPresent(false);
+ Option<HoodieSchema> tableSchema =
schemaResolver.getTableSchemaIfPresent(false);
if (tableSchema.isPresent()) {
- newWriteSchema = HoodieSchema.fromAvroSchema(tableSchema.get());
+ newWriteSchema = tableSchema.get();
} else {
LOG.warn("Could not fetch schema from table. Falling back to using
target schema from schema provider");
}
@@ -1244,17 +1242,6 @@ public class StreamSync implements Serializable,
Closeable {
}
}
- /**
- * Register Avro Schemas.
- *
- * @param schemaProvider Schema Provider
- */
- private void registerAvroSchemas(SchemaProvider schemaProvider) {
- if (null != schemaProvider) {
- registerAvroSchemas(schemaProvider.getSourceHoodieSchema(),
schemaProvider.getTargetHoodieSchema());
- }
- }
-
/**
* Register Avro Schemas.
*
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
index af8b2c5fa2d7..726770e5ad4a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ChainedTransformer.java
@@ -18,15 +18,15 @@
package org.apache.hudi.utilities.transform;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.utilities.exception.HoodieTransformPlanException;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
-import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -52,7 +52,7 @@ public class ChainedTransformer implements Transformer {
private static final String ID_TRANSFORMER_CLASS_NAME_DELIMITER = ":";
protected final List<TransformerInfo> transformers;
- private final Supplier<Option<Schema>> sourceSchemaSupplier;
+ private final Supplier<Option<HoodieSchema>> sourceSchemaSupplier;
public ChainedTransformer(List<Transformer> transformersList) {
this.transformers = new ArrayList<>(transformersList.size());
@@ -69,7 +69,7 @@ public class ChainedTransformer implements Transformer {
* @param sourceSchemaSupplier Supplies the schema (if schema
provider is present) for the dataset the transform is applied to
* @param configuredTransformers List of configured transformer
class names.
*/
- public ChainedTransformer(List<String> configuredTransformers,
Supplier<Option<Schema>> sourceSchemaSupplier) {
+ public ChainedTransformer(List<String> configuredTransformers,
Supplier<Option<HoodieSchema>> sourceSchemaSupplier) {
this.transformers = new ArrayList<>(configuredTransformers.size());
this.sourceSchemaSupplier = sourceSchemaSupplier;
@@ -121,12 +121,12 @@ public class ChainedTransformer implements Transformer {
private StructType getExpectedTransformedSchema(TransformerInfo
transformerInfo, JavaSparkContext jsc, SparkSession sparkSession,
Option<StructType>
incomingStructOpt, Option<Dataset<Row>> rowDatasetOpt, TypedProperties
properties) {
- Option<Schema> sourceSchemaOpt = sourceSchemaSupplier.get();
+ Option<HoodieSchema> sourceSchemaOpt = sourceSchemaSupplier.get();
if (!sourceSchemaOpt.isPresent() && !rowDatasetOpt.isPresent()) {
throw new HoodieTransformPlanException("Either source schema or source
dataset should be available to fetch the schema");
}
StructType incomingStruct = incomingStructOpt
- .orElseGet(() -> sourceSchemaOpt.isPresent() ?
AvroConversionUtils.convertAvroSchemaToStructType(sourceSchemaOpt.get()) :
rowDatasetOpt.get().schema());
+ .orElseGet(() -> sourceSchemaOpt.isPresent() ?
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(sourceSchemaOpt.get())
: rowDatasetOpt.get().schema());
return transformerInfo.getTransformer().transformedSchema(jsc,
sparkSession, incomingStruct, properties).asNullable();
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
index 4d18ea9f11ba..a7d2979e90f0 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/transform/ErrorTableAwareChainedTransformer.java
@@ -20,10 +20,10 @@
package org.apache.hudi.utilities.transform;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.streamer.ErrorTableUtils;
-import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -39,7 +39,7 @@ import java.util.function.Supplier;
* if that column is not dropped in any of the transformations.
*/
public class ErrorTableAwareChainedTransformer extends ChainedTransformer {
- public ErrorTableAwareChainedTransformer(List<String>
configuredTransformers, Supplier<Option<Schema>> sourceSchemaSupplier) {
+ public ErrorTableAwareChainedTransformer(List<String>
configuredTransformers, Supplier<Option<HoodieSchema>> sourceSchemaSupplier) {
super(configuredTransformers, sourceSchemaSupplier);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
index 9d92cb84ed47..60519d333de2 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionBase.java
@@ -19,8 +19,8 @@
package org.apache.hudi.utilities.deltastreamer;
-import org.apache.hudi.AvroConversionUtils;
import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.TestHoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
@@ -226,11 +226,11 @@ public class TestHoodieDeltaStreamerSchemaEvolutionBase
extends HoodieDeltaStrea
protected void addData(Dataset<Row> df, Boolean isFirst) {
if (useSchemaProvider) {
- TestSchemaProvider.sourceSchema = HoodieSchema.fromAvroSchema(
- AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE));
+ TestSchemaProvider.sourceSchema =
+
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(df.schema(),
HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE);
if (withErrorTable && isFirst) {
- TestSchemaProvider.setTargetSchema(HoodieSchema.fromAvroSchema(
-
AvroConversionUtils.convertStructTypeToAvroSchema(TestHoodieSparkUtils.getSchemaColumnNotNullable(df.schema(),
"_row_key"),"idk", "idk")));
+ TestSchemaProvider.setTargetSchema(
+
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(TestHoodieSparkUtils.getSchemaColumnNotNullable(df.schema(),
"_row_key"),"idk", "idk"));
}
}
if (useKafkaSource) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index 150851cc3e08..b11e21d4bbe3 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -21,6 +21,8 @@ package org.apache.hudi.utilities.deltastreamer;
import org.apache.hudi.TestHoodieSparkUtils;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
@@ -28,7 +30,6 @@ import org.apache.hudi.exception.MissingSchemaFieldException;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.streamer.HoodieStreamer;
-import org.apache.avro.Schema;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -334,10 +335,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
deltaStreamer.sync();
metaClient.reloadActiveTimeline();
- Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
+ Option<HoodieSchema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
dsConfig.targetBasePath, metaClient);
- assertTrue(latestTableSchemaOpt.get().getField("rider").schema().getTypes()
- .stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING)));
+
assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes()
+ .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING));
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
> 0);
}
@@ -409,10 +410,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
assertTrue(allowNullForDeletedCols || targetSchemaSameAsTableSchema);
metaClient.reloadActiveTimeline();
- Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
+ Option<HoodieSchema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
dsConfig.targetBasePath, metaClient);
-
assertTrue(latestTableSchemaOpt.get().getField("rider").schema().getTypes()
- .stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING)));
+
assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes()
+ .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING));
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
> 0);
} catch (MissingSchemaFieldException e) {
assertFalse(allowNullForDeletedCols || targetSchemaSameAsTableSchema);
@@ -489,10 +490,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
assertTrue(allowNullForDeletedCols || targetSchemaSameAsTableSchema);
metaClient.reloadActiveTimeline();
- Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
+ Option<HoodieSchema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
dsConfig.targetBasePath, metaClient);
-
assertTrue(latestTableSchemaOpt.get().getField("rider").schema().getTypes()
- .stream().anyMatch(t -> t.getType().equals(Schema.Type.STRING)));
+
assertTrue(latestTableSchemaOpt.get().getField("rider").get().schema().getTypes()
+ .stream().anyMatch(t -> t.getType() == HoodieSchemaType.STRING));
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
> 0);
} catch (Exception e) {
assertTrue(containsErrorMessage(e, "has no default value and is
non-nullable",
@@ -569,11 +570,11 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
assertFalse(targetSchemaSameAsTableSchema);
metaClient.reloadActiveTimeline();
- Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
+ Option<HoodieSchema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
dsConfig.targetBasePath, metaClient);
-
assertTrue(latestTableSchemaOpt.get().getField("distance_in_meters").schema().getTypes()
- .stream().anyMatch(t -> t.getType().equals(Schema.Type.DOUBLE)),
-
latestTableSchemaOpt.get().getField("distance_in_meters").schema().toString());
+
assertTrue(latestTableSchemaOpt.get().getField("distance_in_meters").get().schema().getTypes()
+ .stream().anyMatch(t -> t.getType() == HoodieSchemaType.DOUBLE),
+
latestTableSchemaOpt.get().getField("distance_in_meters").get().schema().toString());
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
> 0);
} catch (Exception e) {
assertTrue(targetSchemaSameAsTableSchema);
@@ -657,10 +658,10 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
deltaStreamer.sync();
metaClient.reloadActiveTimeline();
- Option<Schema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
+ Option<HoodieSchema> latestTableSchemaOpt =
UtilHelpers.getLatestTableSchema(jsc, storage,
dsConfig.targetBasePath, metaClient);
-
assertTrue(latestTableSchemaOpt.get().getField("current_ts").schema().getTypes()
- .stream().anyMatch(t -> t.getType().equals(Schema.Type.LONG)));
+
assertTrue(latestTableSchemaOpt.get().getField("current_ts").get().schema().getTypes()
+ .stream().anyMatch(t -> t.getType() == HoodieSchemaType.LONG));
assertTrue(metaClient.reloadActiveTimeline().lastInstant().get().compareTo(lastInstant)
> 0);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
index d80517affd5e..fa09c8b5eb94 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestSourceFormatAdapter.java
@@ -19,7 +19,7 @@
package org.apache.hudi.utilities.deltastreamer;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
@@ -40,7 +40,7 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.avro.SchemaConverters;
+import org.apache.spark.sql.avro.HoodieSparkSchemaConverters;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -110,7 +110,7 @@ public class TestSourceFormatAdapter {
TypedProperties typedProperties = new TypedProperties();
typedProperties.put(HoodieStreamerConfig.SANITIZE_SCHEMA_FIELD_NAMES.key(),
true);
typedProperties.put(HoodieStreamerConfig.SCHEMA_FIELD_NAME_INVALID_CHAR_MASK.key(),
"__");
- setupJsonSource(rdd,
HoodieSchema.fromAvroSchema(SchemaConverters.toAvroType(sanitizedSchema, false,
"record", "")));
+ setupJsonSource(rdd,
HoodieSparkSchemaConverters.toHoodieType(sanitizedSchema, false, "record", ""));
SourceFormatAdapter sourceFormatAdapter = new
SourceFormatAdapter(testJsonDataSource, Option.empty(),
Option.of(typedProperties));
return sourceFormatAdapter.fetchNewDataInRowFormat(Option.of(new
StreamerCheckpointV2(DUMMY_CHECKPOINT)), 10L);
}
@@ -122,8 +122,8 @@ public class TestSourceFormatAdapter {
assertEquals(2, ds.collectAsList().size());
assertEquals(sanitizedSchema, ds.schema());
if (inputBatch.getSchemaProvider() instanceof RowBasedSchemaProvider) {
-
assertEquals(HoodieSchema.fromAvroSchema(AvroConversionUtils.convertStructTypeToAvroSchema(sanitizedSchema,
- "hoodie_source", "hoodie.source")),
inputBatch.getSchemaProvider().getSourceHoodieSchema());
+
assertEquals(HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(sanitizedSchema,
+ "hoodie_source", "hoodie.source"),
inputBatch.getSchemaProvider().getSourceHoodieSchema());
}
assertEquals(expectedRDD.collect(), ds.toJSON().collectAsList());
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
index cb4bffd7e823..4fe858672635 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestChainedTransformer.java
@@ -20,13 +20,13 @@
package org.apache.hudi.utilities.functional;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.exception.HoodieTransformPlanException;
import org.apache.hudi.utilities.transform.ChainedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
-import org.apache.avro.Schema;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
@@ -43,8 +43,8 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
-import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.AVRO_SCHEMA;
-import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.NESTED_AVRO_SCHEMA;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.HOODIE_SCHEMA;
+import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.NESTED_SCHEMA;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
import static org.apache.spark.sql.types.DataTypes.createStructField;
@@ -115,7 +115,7 @@ public class TestChainedTransformer extends
SparkClientFunctionalTestHarness {
@Test
public void testChainedTransformerTransformedSchema() {
String transformerName =
"org.apache.hudi.utilities.transform.FlatteningTransformer";
- ChainedTransformer transformer = new
ChainedTransformer(Arrays.asList(transformerName.split(",")), () ->
Option.of(NESTED_AVRO_SCHEMA));
+ ChainedTransformer transformer = new
ChainedTransformer(Arrays.asList(transformerName.split(",")), () ->
Option.of(NESTED_SCHEMA));
StructType transformedSchema = transformer.transformedSchema(jsc(),
spark(), null, new TypedProperties());
// Verify transformed nested fields are present in the transformed schema
assertTrue(Arrays.asList(transformedSchema.fieldNames()).contains("fare_amount"));
@@ -127,11 +127,11 @@ public class TestChainedTransformer extends
SparkClientFunctionalTestHarness {
public void assertSchemaSupplierIsCalledPerInvocationOfTransformedSchema() {
String transformerName =
"org.apache.hudi.utilities.transform.FlatteningTransformer";
AtomicInteger count = new AtomicInteger(0);
- Supplier<Option<Schema>> schemaSupplier = () -> {
+ Supplier<Option<HoodieSchema>> schemaSupplier = () -> {
if (count.getAndIncrement() == 0) {
- return Option.of(AVRO_SCHEMA);
+ return Option.of(HOODIE_SCHEMA);
} else {
- return Option.of(NESTED_AVRO_SCHEMA);
+ return Option.of(NESTED_SCHEMA);
}
};
ChainedTransformer transformer = new
ChainedTransformer(Arrays.asList(transformerName.split(",")), schemaSupplier);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
index 8adfdb4dc377..9c31a39abe70 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGenericRddTransform.java
@@ -18,13 +18,13 @@
package org.apache.hudi.utilities.sources;
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
@@ -53,7 +53,7 @@ public class TestGenericRddTransform extends
SparkClientFunctionalTestHarness {
StructType structType = new StructType(new StructField[] {
new StructField("id", DataTypes.StringType, false, Metadata.empty()),
new StructField("null_check_col", DataTypes.StringType, false,
Metadata.empty())});
- Schema nonNullSchema =
AvroConversionUtils.convertStructTypeToAvroSchema(structType,"record","record");
+ HoodieSchema nonNullSchema =
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(structType,"record","record");
Tuple2<RDD<GenericRecord>, RDD<String>> failSafeRdds =
HoodieSparkUtils.safeCreateRDD(ds, "record",
"record",false, Option.of(nonNullSchema));
assertEquals(5, failSafeRdds._1.count());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
index e2a2b53bdd38..d07e2ddc1744 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/SanitizationTestUtils.java
@@ -24,8 +24,6 @@ import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.MapType;
@@ -134,29 +132,25 @@ public class SanitizationTestUtils {
}
public static HoodieSchema generateRenamedSchemaWithDefaultReplacement() {
- Schema addressSchema = SchemaBuilder.record("__Address").fields()
- .nullableString("__stree9add__ress", "@@@any_address")
- .requiredString("cit__y__")
- .endRecord();
- Schema personSchema = SchemaBuilder.record("Person").fields()
- .requiredString("__firstname")
- .requiredString("__lastname")
- .name("address").type(addressSchema).noDefault()
- .endRecord();
- return HoodieSchema.fromAvroSchema(personSchema);
+ HoodieSchema addressSchema = HoodieSchema.createRecord("__Address", null,
null,
+ Arrays.asList(
+ HoodieSchemaField.of("__stree9add__ress",
HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.STRING),
HoodieSchema.create(HoodieSchemaType.NULL)), null, "@@@any_address"),
+ HoodieSchemaField.of("cit__y__",
HoodieSchema.create(HoodieSchemaType.STRING))));
+ return HoodieSchema.createRecord("Person", null, null,
+ Arrays.asList(HoodieSchemaField.of("__firstname",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("__lastname",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("address", addressSchema)));
}
public static HoodieSchema generateRenamedSchemaWithConfiguredReplacement() {
- Schema addressSchema = SchemaBuilder.record("_Address").fields()
- .nullableString("_stree9add_ress", "@@@any_address")
- .requiredString("cit_y_")
- .endRecord();
- Schema personSchema = SchemaBuilder.record("Person").fields()
- .requiredString("_firstname")
- .requiredString("_lastname")
- .name("address").type(addressSchema).noDefault()
- .endRecord();
- return HoodieSchema.fromAvroSchema(personSchema);
+ HoodieSchema addressSchema = HoodieSchema.createRecord("_Address", null,
null,
+ Arrays.asList(
+ HoodieSchemaField.of("_stree9add_ress",
HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.STRING),
HoodieSchema.create(HoodieSchemaType.NULL)), null, "@@@any_address"),
+ HoodieSchemaField.of("cit_y_",
HoodieSchema.create(HoodieSchemaType.STRING))));
+ return HoodieSchema.createRecord("Person", null, null,
+ Arrays.asList(HoodieSchemaField.of("_firstname",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("_lastname",
HoodieSchema.create(HoodieSchemaType.STRING)),
+ HoodieSchemaField.of("address", addressSchema)));
}
public static Stream<Arguments> provideDataFiles() {