Shawn Chang created HUDI-5034:
---------------------------------
Summary: Enum info lost during schema conversion
Key: HUDI-5034
URL: https://issues.apache.org/jira/browse/HUDI-5034
Project: Apache Hudi
Issue Type: Bug
Components: deltastreamer
Reporter: Shawn Chang
When a transformer is used in deltastreamer sync, SparkAvroPostProcessor would
be attached to SchemaProvider by default (see
[[code|https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java#L485]])
And in SparkAvroPostProcessor it's converting avro schema to struct type schema
and then convert it back immediately (see
[code|https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java#L42])
But during the conversion, if the original avro schema has 'enum' field
specified, the field would be lost: schema would first be converted to struct
type schema and the 'enum' would be converted to 'string' type. And when it's
converted back to avro type, the 'string' type would not be converted back to
'enum'.
Steps to reproduce:
# Prepare an avro schema that contains enum field, sample below
#
{code:java}
{
"name": "accountDataRecord",
"namespace": "sample.test",
"type": "record",
"fields": [
{
"name": "action",
"type": {
"name": "testEnum",
"type" : "enum",
"symbols": [
"INSERT",
"UPDATE",
"DELETE"
]
}
},
{"name":"ts","type":"int"}
]
} {code}
# Run Deltastreamer with a transformer
# Exception:
#
{code:java}
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200)
at
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200)
at scala.Option.foreach(Option.scala:407)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2255)
at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557)
at
org.apache.hudi.AvroConversionUtils$.createDataFrame(AvroConversionUtils.scala:131)
at
org.apache.hudi.AvroConversionUtils.createDataFrame(AvroConversionUtils.scala)
at
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.lambda$fetchNewDataInRowFormat$2(SourceFormatAdapter.java:109)
at org.apache.hudi.common.util.Option.map(Option.java:108)
at
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:109)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:424)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:398)
at
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:200)
at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:198)
at
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:549)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.avro.AvroTypeException: Found sample.test.testEnum,
expecting string
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:203)
at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
at
org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:222)
at
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
at
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:298)
at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.file.DataFileStream.next(DataFileStream.java:251)
at
org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:126)
at
org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:55)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) {code}
Some logs I added to expose the issues:
{code:java}
// original schema
22/10/14 03:33:51 INFO UtilHelpers: UtilHelpers,
wrapSchemaProviderWithPostProcessor, schema provider by the end: {
"type" : "record",
"name" : "accountDataRecord",
"namespace" : "sample.test",
"fields" : [ {
"name" : "action",
"type" : {
"type" : "enum",
"name" : "testEnum",
"symbols" : [ "INSERT", "UPDATE", "DELETE" ]
}
}, {
"name" : "ts",
"type" : "int"
} ]
}
/* Around this LOC
(https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java#L673)
The schema converted back lost enum field already */
22/10/14 03:33:51 INFO HoodieDeltaStreamer: HoodieDeltaStreamer, source schema
from schema provider: {
"type" : "record",
"name" : "hoodie_source",
"namespace" : "hoodie.source",
"fields" : [ {
"name" : "action",
"type" : "string"
}, {
"name" : "ts",
"type" : "int"
} ]
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)