[ 
https://issues.apache.org/jira/browse/FLINK-18096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346612#comment-17346612
 ] 

YoungWoo Kim commented on FLINK-18096:
--------------------------------------

[~jark], [~sjwiesman]
At least, A generated schema from Flink is not compatible with python parser of 
Apache Avro. The name 'record' of record without namespaced is the cause of 
failure. for instance, I have following avro schema that was created by Flink:
```
{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "dt",
      "type": [
        "null",
        {
          "type": "int",
          "logicalType": "date"
        }
      ],
      "default": null
    },
    {
      "name": "value",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}
```
Another application which uses avro parser for Python can not parse this schema:
- 
https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroSchemaConverter.java#L307

The 'record' is not namespaced. so I got traceback from avro parser:
```
Traceback (most recent call last):
  File "/usr/local/bin/datahub", line 8, in <module>
    sys.exit(datahub())
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 829, in 
__call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1259, in 
invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 1066, in 
invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.8/site-packages/click/core.py", line 610, in 
invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/datahub/entrypoints.py", line 
74, in ingest
    pipeline.run()
  File 
"/usr/local/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py", 
line 108, in run
    for wu in self.source.get_workunits():
  File 
"/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", 
line 79, in get_workunits
    mce = self._extract_record(t)
  File 
"/usr/local/lib/python3.8/site-packages/datahub/ingestion/source/kafka.py", 
line 112, in _extract_record
    fields = schema_util.avro_schema_to_mce_fields(schema.schema_str)
  File 
"/usr/local/lib/python3.8/site-packages/datahub/ingestion/extractor/schema_util.py",
 line 117, in avro_schema_to_mce_fields
    parsed_schema: avro.schema.Schema = schema_parse_fn(avro_schema_string)
  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1244, in 
parse
    return SchemaFromJSONData(json_data, names)
  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1215, in 
SchemaFromJSONData
    return parser(json_data, names=names)
  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1136, in 
_SchemaFromJSONObject
    return RecordSchema(
  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 1003, in 
__init__
    super(RecordSchema, self).__init__(
  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 440, in 
__init__
    names.Register(self)
  File "/usr/local/lib/python3.8/site-packages/avro/schema.py", line 399, in 
Register
    raise SchemaParseException(
avro.schema.SchemaParseException: record is a reserved type name.

```
See https://github.com/apache/avro/blob/master/lang/py3/avro/schema.py#L398

At first, I think that renaming of record to 'Record' or adding a namespace 
e.g., `"namespace": "org.apache.flink.formats.avro.generated",` to current 
implementations, are possible solutions. however, It looks like the 
configurations with default values is appropriate way to go.

> Generated avro formats should support user specified name and namespace
> -----------------------------------------------------------------------
>
>                 Key: FLINK-18096
>                 URL: https://issues.apache.org/jira/browse/FLINK-18096
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>            Reporter: Seth Wiesman
>            Priority: Minor
>              Labels: auto-deprioritized-major
>
> When avro schema is auto derived it should still be possible to specify 
> namespace and name. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to