[
https://issues.apache.org/jira/browse/HUDI-1602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexander Filipchik updated HUDI-1602:
--------------------------------------
Description:
we are running a HUDI deltastreamer on a very complex stream. Schema is deeply
nested, with several levels of hierarchy (avro schema is around 6600 LOC).
The version of HUDI that writes the dataset if 0.5-SNAPTHOT and we recently
started attempts to upgrade to the latest. Hovewer, latest HUDI can't read the
provided dataset. Exception I get:
{code:java}
Got exception while parsing the arguments:Got exception while parsing the
arguments:Found recursive reference in Avro schema, which can not be processed
by Spark:{ "type" : "record", "name" : "array", "fields" : [ { "name" :
"id", "type" : [ "null", "string" ], "default" : null }, { "name" :
"type", "type" : [ "null", "string" ], "default" : null }, { "name" :
"exist", "type" : [ "null", "boolean" ], "default" : null } ]}
Stack trace:org.apache.spark.sql.avro.IncompatibleSchemaException:Found
recursive reference in Avro schema, which can not be processed by Spark:{
"type" : "record", "name" : "array", "fields" : [ { "name" : "id",
"type" : [ "null", "string" ], "default" : null }, { "name" : "type",
"type" : [ "null", "string" ], "default" : null }, { "name" : "exist",
"type" : [ "null", "boolean" ], "default" : null } ]}
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:75)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46)
at
org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56)
at
org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:67)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at
com.css.dw.spark.SQLHudiOutputJob.run(SQLHudiOutputJob.java:118) at
com.css.dw.spark.SQLHudiOutputJob.main(SQLHudiOutputJob.java:164) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{code}
I wrote a simple test that opens parquet file, loads schema, and attempts to
convert it into avro and it does fail with the same error. It appears that Avro
schema that looked like:
{noformat}
{
"name": "entity_path",
"type": [
"null",
{
"type": "record",
"name": "MenuEntityPath",
"fields": [
{
"name": "path_nodes",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "PathNode",
"namespace": "Menue_pathPath$",
"fields": [
{
"name": "id",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "type",
"type": [
"null",
{
"type": "enum",
"name": "MenuEntityType",
"namespace": "shared",
"symbols": [
"UNKNOWN"
]
}
],
"default": null
}
]
}
}
],
"default": null
}
]
}
],
"default": null
}
]
}
],
"default": null
},{noformat}
Is converted into:
{noformat}
[
"null",
{
"type": "record",
"name": "entity_path",
"fields": [
{
"name": "path_nodes",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "array",
"fields": [
{
"name": "id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "type",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "exist",
"type": [
"null",
"boolean"
],
"default": null
}
]
}
}
],
"default": null
},
{
"name": "exist",
"type": [
"null",
"boolean"
],
"default": null
}
]
}
]{noformat}
A couple of questions: did anyone have similar issues and what is the best way
forward?
Edit:
I converted the dataset into pure parquet by using presto as an intermediary
(create table as select). The result fails with a similar error, but in the
different place:
{noformat}
Found recursive reference in Avro schema, which can not be processed by Spark:
{
"type" : "record",
"name" : "bag",
"fields" : [ {
"name" : "array_element",
"type" : [ "null", {
"type" : "record",
"name" : "array_element",
"fields" : [ {
"name" : "id",{noformat}
it looks like parquet writer replaces arrays with some synthetic records and
gives them the same name.
was:
we are running a HUDI deltastreamer on a very complex stream. Schema is deeply
nested, with several levels of hierarchy (avro schema is around 6600 LOC).
The version of HUDI that writes the dataset if 0.5-SNAPTHOT and we recently
started attempts to upgrade to the latest. Hovewer, latest HUDI can't read the
provided dataset. Exception I get:
{code:java}
Got exception while parsing the arguments:Got exception while parsing the
arguments:Found recursive reference in Avro schema, which can not be processed
by Spark:{ "type" : "record", "name" : "array", "fields" : [ { "name" :
"id", "type" : [ "null", "string" ], "default" : null }, { "name" :
"type", "type" : [ "null", "string" ], "default" : null }, { "name" :
"exist", "type" : [ "null", "boolean" ], "default" : null } ]}
Stack trace:org.apache.spark.sql.avro.IncompatibleSchemaException:Found
recursive reference in Avro schema, which can not be processed by Spark:{
"type" : "record", "name" : "array", "fields" : [ { "name" : "id",
"type" : [ "null", "string" ], "default" : null }, { "name" : "type",
"type" : [ "null", "string" ], "default" : null }, { "name" : "exist",
"type" : [ "null", "boolean" ], "default" : null } ]}
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:75)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46)
at
org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56)
at
org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:67)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at
com.css.dw.spark.SQLHudiOutputJob.run(SQLHudiOutputJob.java:118) at
com.css.dw.spark.SQLHudiOutputJob.main(SQLHudiOutputJob.java:164) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{code}
I wrote a simple test that opens parquet file, loads schema, and attempts to
convert it into avro and it does fail with the same error. It appears that Avro
schema that looked like:
{noformat}
{
"name": "entity_path",
"type": [
"null",
{
"type": "record",
"name": "MenuEntityPath",
"fields": [
{
"name": "path_nodes",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "PathNode",
"namespace": "Menue_pathPath$",
"fields": [
{
"name": "id",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "type",
"type": [
"null",
{
"type": "enum",
"name": "MenuEntityType",
"namespace": "shared",
"symbols": [
"UNKNOWN"
]
}
],
"default": null
}
]
}
}
],
"default": null
}
]
}
],
"default": null
}
]
}
],
"default": null
},{noformat}
Is converted into:
{noformat}
[
"null",
{
"type": "record",
"name": "entity_path",
"fields": [
{
"name": "path_nodes",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "array",
"fields": [
{
"name": "id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "type",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "exist",
"type": [
"null",
"boolean"
],
"default": null
}
]
}
}
],
"default": null
},
{
"name": "exist",
"type": [
"null",
"boolean"
],
"default": null
}
]
}
]{noformat}
Couple of questions: did anyone have similar issues and what is the best way
forward?
> Corrupted Avro schema extracted from parquet file
> -------------------------------------------------
>
> Key: HUDI-1602
> URL: https://issues.apache.org/jira/browse/HUDI-1602
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Alexander Filipchik
> Priority: Major
>
> we are running a HUDI deltastreamer on a very complex stream. Schema is
> deeply nested, with several levels of hierarchy (avro schema is around 6600
> LOC).
>
> The version of HUDI that writes the dataset if 0.5-SNAPTHOT and we recently
> started attempts to upgrade to the latest. Hovewer, latest HUDI can't read
> the provided dataset. Exception I get:
>
>
> {code:java}
> Got exception while parsing the arguments:Got exception while parsing the
> arguments:Found recursive reference in Avro schema, which can not be
> processed by Spark:{ "type" : "record", "name" : "array", "fields" : [ {
> "name" : "id", "type" : [ "null", "string" ], "default" : null }, {
> "name" : "type", "type" : [ "null", "string" ], "default" : null }, {
> "name" : "exist", "type" : [ "null", "boolean" ], "default" : null
> } ]} Stack
> trace:org.apache.spark.sql.avro.IncompatibleSchemaException:Found recursive
> reference in Avro schema, which can not be processed by Spark:{ "type" :
> "record", "name" : "array", "fields" : [ { "name" : "id", "type" : [
> "null", "string" ], "default" : null }, { "name" : "type", "type" :
> [ "null", "string" ], "default" : null }, { "name" : "exist",
> "type" : [ "null", "boolean" ], "default" : null } ]}
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:75)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46)
> at
> org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56)
> at
> org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:67)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at
> org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at
> com.css.dw.spark.SQLHudiOutputJob.run(SQLHudiOutputJob.java:118) at
> com.css.dw.spark.SQLHudiOutputJob.main(SQLHudiOutputJob.java:164) at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
>
> I wrote a simple test that opens parquet file, loads schema, and attempts to
> convert it into avro and it does fail with the same error. It appears that
> Avro schema that looked like:
>
> {noformat}
> {
> "name": "entity_path",
> "type": [
> "null",
> {
> "type": "record",
> "name": "MenuEntityPath",
> "fields": [
> {
> "name": "path_nodes",
> "type": [
> "null",
> {
> "type": "array",
> "items": {
> "type": "record",
> "name": "PathNode",
> "namespace": "Menue_pathPath$",
> "fields": [
> {
> "name": "id",
> "type": [
> "null",
> {
> "type": "string",
> "avro.java.string": "String"
> }
> ],
> "default": null
> },
> {
> "name": "type",
> "type": [
> "null",
> {
> "type": "enum",
> "name": "MenuEntityType",
> "namespace": "shared",
> "symbols": [
> "UNKNOWN"
> ]
> }
> ],
> "default": null
> }
> ]
> }
> }
> ],
> "default": null
> }
> ]
> }
> ],
> "default": null
> }
> ]
> }
> ],
> "default": null
> },{noformat}
> Is converted into:
> {noformat}
> [
> "null",
> {
> "type": "record",
> "name": "entity_path",
> "fields": [
> {
> "name": "path_nodes",
> "type": [
> "null",
> {
> "type": "array",
> "items": {
> "type": "record",
> "name": "array",
> "fields": [
> {
> "name": "id",
> "type": [
> "null",
> "string"
> ],
> "default": null
> },
> {
> "name": "type",
> "type": [
> "null",
> "string"
> ],
> "default": null
> },
> {
> "name": "exist",
> "type": [
> "null",
> "boolean"
> ],
> "default": null
> }
> ]
> }
> }
> ],
> "default": null
> },
> {
> "name": "exist",
> "type": [
> "null",
> "boolean"
> ],
> "default": null
> }
> ]
> }
> ]{noformat}
> A couple of questions: did anyone have similar issues and what is the best
> way forward?
>
> Edit:
> I converted the dataset into pure parquet by using presto as an intermediary
> (create table as select). The result fails with a similar error, but in the
> different place:
>
> {noformat}
> Found recursive reference in Avro schema, which can not be processed by Spark:
> {
> "type" : "record",
> "name" : "bag",
> "fields" : [ {
> "name" : "array_element",
> "type" : [ "null", {
> "type" : "record",
> "name" : "array_element",
> "fields" : [ {
> "name" : "id",{noformat}
> it looks like parquet writer replaces arrays with some synthetic records and
> gives them the same name.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)