Shawn Chang created HUDI-5034:
---------------------------------

             Summary: Enum info lost during schema conversion
                 Key: HUDI-5034
                 URL: https://issues.apache.org/jira/browse/HUDI-5034
             Project: Apache Hudi
          Issue Type: Bug
          Components: deltastreamer
            Reporter: Shawn Chang


When a transformer is used in deltastreamer sync, SparkAvroPostProcessor would 
be attached to SchemaProvider by default (see 
[[code|https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java#L485]])

 

And in SparkAvroPostProcessor it's converting avro schema to struct type schema 
and then convert it back immediately (see 
[code|https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java#L42])
 

 

But during the conversion, if the original avro schema has 'enum' field 
specified, the field would be lost: schema would first be converted to struct 
type schema and the 'enum' would be converted to 'string' type. And when it's 
converted back to avro type, the 'string' type would not be converted back to 
'enum'.

 

Steps to reproduce:
 # Prepare an avro schema that contains enum field, sample below
 # 
{code:java}
{
    "name": "accountDataRecord",
    "namespace": "sample.test",
    "type": "record",
    "fields": [
        {
            "name": "action",
            "type": {
                "name": "testEnum",
                "type" : "enum",
                "symbols": [
                    "INSERT",
                    "UPDATE",
                    "DELETE"
                ]
            }
        },
    {"name":"ts","type":"int"}
    ]
} {code}

 # Run Deltastreamer with a transformer
 # Exception: 
 # 
{code:java}
Driver stacktrace:
    at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2610)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2559)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2558)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2558)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1200)
    at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1200)
    at scala.Option.foreach(Option.scala:407)
    at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1200)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2798)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2740)
    at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2729)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:978)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2215)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2255)
    at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1449)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1422)
    at org.apache.spark.rdd.RDD.$anonfun$isEmpty$1(RDD.scala:1557)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
    at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1557)
    at 
org.apache.hudi.AvroConversionUtils$.createDataFrame(AvroConversionUtils.scala:131)
    at 
org.apache.hudi.AvroConversionUtils.createDataFrame(AvroConversionUtils.scala)
    at 
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.lambda$fetchNewDataInRowFormat$2(SourceFormatAdapter.java:109)
    at org.apache.hudi.common.util.Option.map(Option.java:108)
    at 
org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter.fetchNewDataInRowFormat(SourceFormatAdapter.java:109)
    at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.fetchFromSource(DeltaSync.java:424)
    at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.readFromSource(DeltaSync.java:398)
    at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303)
    at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:200)
    at org.apache.hudi.common.util.Option.ifPresent(Option.java:97)
    at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:198)
    at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:549)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at 
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1000)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
    at 
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1089)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1098)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: org.apache.avro.AvroTypeException: Found sample.test.testEnum, 
expecting string
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:203)
    at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:469)
    at 
org.apache.avro.reflect.ReflectDatumReader.readString(ReflectDatumReader.java:222)
    at 
org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:459)
    at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:191)
    at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
    at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
    at 
org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:142)
    at 
org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:298)
    at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
    at 
org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
    at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
    at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
    at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.file.DataFileStream.next(DataFileStream.java:251)
    at 
org.apache.avro.mapreduce.AvroRecordReaderBase.nextKeyValue(AvroRecordReaderBase.java:126)
    at 
org.apache.avro.mapreduce.AvroKeyRecordReader.nextKeyValue(AvroKeyRecordReader.java:55)
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:247)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$SliceIterator.hasNext(Iterator.scala:268)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) {code}

Some logs I added to expose the issues:
{code:java}
// original schema
22/10/14 03:33:51 INFO UtilHelpers: UtilHelpers, 
wrapSchemaProviderWithPostProcessor, schema provider by the end: {
  "type" : "record",
  "name" : "accountDataRecord",
  "namespace" : "sample.test",
  "fields" : [ {
    "name" : "action",
    "type" : {
      "type" : "enum",
      "name" : "testEnum",
      "symbols" : [ "INSERT", "UPDATE", "DELETE" ]
    }
  }, {
    "name" : "ts",
    "type" : "int"
  } ]
}

/* Around this LOC 
(https://github.com/apache/hudi/blob/94c068ae4fd75ab011e6a3f1c593fdd5db42da3b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java#L673)
 The schema converted back lost enum field already */
22/10/14 03:33:51 INFO HoodieDeltaStreamer: HoodieDeltaStreamer, source schema 
from schema provider: {
  "type" : "record",
  "name" : "hoodie_source",
  "namespace" : "hoodie.source",
  "fields" : [ {
    "name" : "action",
    "type" : "string"
  }, {
    "name" : "ts",
    "type" : "int"
  } ]
} {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to