[
https://issues.apache.org/jira/browse/HUDI-1602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alexander Filipchik updated HUDI-1602:
--------------------------------------
Description:
we are running a HUDI deltastreamer on a very complex stream. Schema is deeply
nested, with several levels of hierarchy (avro schema is around 6600 LOC).
The version of HUDI that writes the dataset if 0.5-SNAPTHOT and we recently
started attempts to upgrade to the latest. Hovewer, latest HUDI can't read the
provided dataset. Exception I get:
{code:java}
Got exception while parsing the arguments:Got exception while parsing the
arguments:Found recursive reference in Avro schema, which can not be processed
by Spark:{ "type" : "record", "name" : "array", "fields" : [ { "name" :
"id", "type" : [ "null", "string" ], "default" : null }, { "name" :
"type", "type" : [ "null", "string" ], "default" : null }, { "name" :
"exist", "type" : [ "null", "boolean" ], "default" : null } ]}
Stack trace:org.apache.spark.sql.avro.IncompatibleSchemaException:Found
recursive reference in Avro schema, which can not be processed by Spark:{
"type" : "record", "name" : "array", "fields" : [ { "name" : "id",
"type" : [ "null", "string" ], "default" : null }, { "name" : "type",
"type" : [ "null", "string" ], "default" : null }, { "name" : "exist",
"type" : [ "null", "boolean" ], "default" : null } ]}
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:75)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46)
at
org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56)
at
org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:67)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at
com.css.dw.spark.SQLHudiOutputJob.run(SQLHudiOutputJob.java:118) at
com.css.dw.spark.SQLHudiOutputJob.main(SQLHudiOutputJob.java:164) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{code}
I wrote a simple test that opens parquet file, loads schema, and attempts to
convert it into avro and it does fail with the same error. It appears that Avro
schema that looked like:
{noformat}
{
"name": "entity_path",
"type": [
"null",
{
"type": "record",
"name": "MenuEntityPath",
"fields": [
{
"name": "path_nodes",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "PathNode",
"namespace": "Menue_pathPath$",
"fields": [
{
"name": "id",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "type",
"type": [
"null",
{
"type": "enum",
"name": "MenuEntityType",
"namespace": "shared",
"symbols": [
"UNKNOWN"
]
}
],
"default": null
}
]
}
}
],
"default": null
}
]
}
],
"default": null
}
]
}
],
"default": null
},{noformat}
Is converted into:
{noformat}
[
"null",
{
"type": "record",
"name": "entity_path",
"fields": [
{
"name": "path_nodes",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "array",
"fields": [
{
"name": "id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "type",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "exist",
"type": [
"null",
"boolean"
],
"default": null
}
]
}
}
],
"default": null
},
{
"name": "exist",
"type": [
"null",
"boolean"
],
"default": null
}
]
}
]{noformat}
A couple of questions: did anyone have similar issues and what is the best way
forward?
Edit:
I converted the dataset into pure parquet by using presto as an intermediary
(create table as select). The result fails with a similar error, but in the
different place:
{noformat}
Found recursive reference in Avro schema, which can not be processed by Spark:
{
"type" : "record",
"name" : "bag",
"fields" : [ {
"name" : "array_element",
"type" : [ "null", {
"type" : "record",
"name" : "array_element",
"fields" : [ {
"name" : "id",{noformat}
it looks like the parquet writer replaces arrays with some synthetic records
and gives them the same name.
Also, Spark reader works. I can open the parquet file directly by using:
{noformat}
Dataset dataset = spark.read().parquet() {noformat}
was:
we are running a HUDI deltastreamer on a very complex stream. Schema is deeply
nested, with several levels of hierarchy (avro schema is around 6600 LOC).
The version of HUDI that writes the dataset if 0.5-SNAPTHOT and we recently
started attempts to upgrade to the latest. Hovewer, latest HUDI can't read the
provided dataset. Exception I get:
{code:java}
Got exception while parsing the arguments:Got exception while parsing the
arguments:Found recursive reference in Avro schema, which can not be processed
by Spark:{ "type" : "record", "name" : "array", "fields" : [ { "name" :
"id", "type" : [ "null", "string" ], "default" : null }, { "name" :
"type", "type" : [ "null", "string" ], "default" : null }, { "name" :
"exist", "type" : [ "null", "boolean" ], "default" : null } ]}
Stack trace:org.apache.spark.sql.avro.IncompatibleSchemaException:Found
recursive reference in Avro schema, which can not be processed by Spark:{
"type" : "record", "name" : "array", "fields" : [ { "name" : "id",
"type" : [ "null", "string" ], "default" : null }, { "name" : "type",
"type" : [ "null", "string" ], "default" : null }, { "name" : "exist",
"type" : [ "null", "boolean" ], "default" : null } ]}
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:75)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
at
org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
at
org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46)
at
org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56)
at
org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:67)
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at
org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at
org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at
org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at
com.css.dw.spark.SQLHudiOutputJob.run(SQLHudiOutputJob.java:118) at
com.css.dw.spark.SQLHudiOutputJob.main(SQLHudiOutputJob.java:164) at
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at
org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920) at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
{code}
I wrote a simple test that opens parquet file, loads schema, and attempts to
convert it into avro and it does fail with the same error. It appears that Avro
schema that looked like:
{noformat}
{
"name": "entity_path",
"type": [
"null",
{
"type": "record",
"name": "MenuEntityPath",
"fields": [
{
"name": "path_nodes",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "PathNode",
"namespace": "Menue_pathPath$",
"fields": [
{
"name": "id",
"type": [
"null",
{
"type": "string",
"avro.java.string": "String"
}
],
"default": null
},
{
"name": "type",
"type": [
"null",
{
"type": "enum",
"name": "MenuEntityType",
"namespace": "shared",
"symbols": [
"UNKNOWN"
]
}
],
"default": null
}
]
}
}
],
"default": null
}
]
}
],
"default": null
}
]
}
],
"default": null
},{noformat}
Is converted into:
{noformat}
[
"null",
{
"type": "record",
"name": "entity_path",
"fields": [
{
"name": "path_nodes",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "array",
"fields": [
{
"name": "id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "type",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "exist",
"type": [
"null",
"boolean"
],
"default": null
}
]
}
}
],
"default": null
},
{
"name": "exist",
"type": [
"null",
"boolean"
],
"default": null
}
]
}
]{noformat}
A couple of questions: did anyone have similar issues and what is the best way
forward?
Edit:
I converted the dataset into pure parquet by using presto as an intermediary
(create table as select). The result fails with a similar error, but in the
different place:
{noformat}
Found recursive reference in Avro schema, which can not be processed by Spark:
{
"type" : "record",
"name" : "bag",
"fields" : [ {
"name" : "array_element",
"type" : [ "null", {
"type" : "record",
"name" : "array_element",
"fields" : [ {
"name" : "id",{noformat}
it looks like parquet writer replaces arrays with some synthetic records and
gives them the same name.
> Corrupted Avro schema extracted from parquet file
> -------------------------------------------------
>
> Key: HUDI-1602
> URL: https://issues.apache.org/jira/browse/HUDI-1602
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: Alexander Filipchik
> Priority: Major
>
> we are running a HUDI deltastreamer on a very complex stream. Schema is
> deeply nested, with several levels of hierarchy (avro schema is around 6600
> LOC).
>
> The version of HUDI that writes the dataset if 0.5-SNAPTHOT and we recently
> started attempts to upgrade to the latest. Hovewer, latest HUDI can't read
> the provided dataset. Exception I get:
>
>
> {code:java}
> Got exception while parsing the arguments:Got exception while parsing the
> arguments:Found recursive reference in Avro schema, which can not be
> processed by Spark:{ "type" : "record", "name" : "array", "fields" : [ {
> "name" : "id", "type" : [ "null", "string" ], "default" : null }, {
> "name" : "type", "type" : [ "null", "string" ], "default" : null }, {
> "name" : "exist", "type" : [ "null", "boolean" ], "default" : null
> } ]} Stack
> trace:org.apache.spark.sql.avro.IncompatibleSchemaException:Found recursive
> reference in Avro schema, which can not be processed by Spark:{ "type" :
> "record", "name" : "array", "fields" : [ { "name" : "id", "type" : [
> "null", "string" ], "default" : null }, { "name" : "type", "type" :
> [ "null", "string" ], "default" : null }, { "name" : "exist",
> "type" : [ "null", "boolean" ], "default" : null } ]}
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:75)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:89)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:105)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:82)
> at
> org.apache.spark.sql.avro.SchemaConverters$$anonfun$1.apply(SchemaConverters.scala:81)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891) at
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlTypeHelper(SchemaConverters.scala:81)
> at
> org.apache.spark.sql.avro.SchemaConverters$.toSqlType(SchemaConverters.scala:46)
> at
> org.apache.hudi.AvroConversionUtils$.convertAvroSchemaToStructType(AvroConversionUtils.scala:56)
> at
> org.apache.hudi.MergeOnReadSnapshotRelation.<init>(MergeOnReadSnapshotRelation.scala:67)
> at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:89) at
> org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:53) at
> org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
> at
> org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
> at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at
> org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at
> com.css.dw.spark.SQLHudiOutputJob.run(SQLHudiOutputJob.java:118) at
> com.css.dw.spark.SQLHudiOutputJob.main(SQLHudiOutputJob.java:164) at
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498) at
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at
> org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
> at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929) at
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
>
> I wrote a simple test that opens parquet file, loads schema, and attempts to
> convert it into avro and it does fail with the same error. It appears that
> Avro schema that looked like:
>
> {noformat}
> {
> "name": "entity_path",
> "type": [
> "null",
> {
> "type": "record",
> "name": "MenuEntityPath",
> "fields": [
> {
> "name": "path_nodes",
> "type": [
> "null",
> {
> "type": "array",
> "items": {
> "type": "record",
> "name": "PathNode",
> "namespace": "Menue_pathPath$",
> "fields": [
> {
> "name": "id",
> "type": [
> "null",
> {
> "type": "string",
> "avro.java.string": "String"
> }
> ],
> "default": null
> },
> {
> "name": "type",
> "type": [
> "null",
> {
> "type": "enum",
> "name": "MenuEntityType",
> "namespace": "shared",
> "symbols": [
> "UNKNOWN"
> ]
> }
> ],
> "default": null
> }
> ]
> }
> }
> ],
> "default": null
> }
> ]
> }
> ],
> "default": null
> }
> ]
> }
> ],
> "default": null
> },{noformat}
> Is converted into:
> {noformat}
> [
> "null",
> {
> "type": "record",
> "name": "entity_path",
> "fields": [
> {
> "name": "path_nodes",
> "type": [
> "null",
> {
> "type": "array",
> "items": {
> "type": "record",
> "name": "array",
> "fields": [
> {
> "name": "id",
> "type": [
> "null",
> "string"
> ],
> "default": null
> },
> {
> "name": "type",
> "type": [
> "null",
> "string"
> ],
> "default": null
> },
> {
> "name": "exist",
> "type": [
> "null",
> "boolean"
> ],
> "default": null
> }
> ]
> }
> }
> ],
> "default": null
> },
> {
> "name": "exist",
> "type": [
> "null",
> "boolean"
> ],
> "default": null
> }
> ]
> }
> ]{noformat}
> A couple of questions: did anyone have similar issues and what is the best
> way forward?
>
> Edit:
> I converted the dataset into pure parquet by using presto as an intermediary
> (create table as select). The result fails with a similar error, but in the
> different place:
>
> {noformat}
> Found recursive reference in Avro schema, which can not be processed by Spark:
> {
> "type" : "record",
> "name" : "bag",
> "fields" : [ {
> "name" : "array_element",
> "type" : [ "null", {
> "type" : "record",
> "name" : "array_element",
> "fields" : [ {
> "name" : "id",{noformat}
> it looks like the parquet writer replaces arrays with some synthetic records
> and gives them the same name.
>
> Also, Spark reader works. I can open the parquet file directly by using:
> {noformat}
> Dataset dataset = spark.read().parquet() {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)