[
https://issues.apache.org/jira/browse/FLINK-20234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17356294#comment-17356294
]
luoyuxia commented on FLINK-20234:
----------------------------------
we can set option
h5. debezium-json.ignore-parse-errors = true
to avoid this problem.
h5.
> Json format supports SE/DE null elements of ARRAY type field
> ------------------------------------------------------------
>
> Key: FLINK-20234
> URL: https://issues.apache.org/jira/browse/FLINK-20234
> Project: Flink
> Issue Type: Improvement
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table
> SQL / Ecosystem
> Affects Versions: 1.11.2
> Reporter: Danny Chen
> Priority: Minor
> Labels: auto-deprioritized-major
>
> Report my USER mailing list:
> Hi,
> I recently discovered some of our data has NULL values arriving in an
> ARRAY<STRING> column. This column is being consumed by Flink via the Kafka
> connector Debezium format. We seem to be receiving NullPointerExceptions for
> when these NULL values in the arrays arrive which restarts the source
> operator in a loop.
> Is there any way to not throw or to possibly filter out NULLs in an Array of
> Strings in Flink?
> We're somewhat stuck on how to solve this problem, we'd like to be defensive
> about this on Flink's side.
> Thanks!
> The JSON:
> {code:java}
> {
> "schema": {
> "type": "struct",
> "fields": [
> {
> "type": "struct",
> "fields": [
> { "type": "int32", "optional": false, "field": "id" },
> {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
> },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "before"
> },
> {
> "type": "struct",
> "fields": [
> { "type": "int32", "optional": false, "field": "id" },
> {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
> },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "after"
> },
> {
> "type": "struct",
> "fields": [
> { "type": "string", "optional": false, "field": "version" },
> { "type": "string", "optional": false, "field": "connector" },
> { "type": "string", "optional": false, "field": "name" },
> { "type": "int64", "optional": false, "field": "ts_ms" },
> {
> "type": "string",
> "optional": true,
> "name": "io.debezium.data.Enum",
> "version": 1,
> "parameters": { "allowed": "true,last,false" },
> "default": "false",
> "field": "snapshot"
> },
> { "type": "string", "optional": false, "field": "db" },
> { "type": "string", "optional": false, "field": "schema" },
> { "type": "string", "optional": false, "field": "table" },
> { "type": "int64", "optional": true, "field": "txId" },
> { "type": "int64", "optional": true, "field": "lsn" },
> { "type": "int64", "optional": true, "field": "xmin" }
> ],
> "optional": false,
> "name": "io.debezium.connector.postgresql.Source",
> "field": "source"
> },
> { "type": "string", "optional": false, "field": "op" },
> { "type": "int64", "optional": true, "field": "ts_ms" },
> {
> "type": "struct",
> "fields": [
> { "type": "string", "optional": false, "field": "id" },
> { "type": "int64", "optional": false, "field": "total_order" },
> {
> "type": "int64",
> "optional": false,
> "field": "data_collection_order"
> }
> ],
> "optional": true,
> "field": "transaction"
> }
> ],
> "optional": false,
> "name": "db.public.data.Envelope"
> },
> "payload": {
> "before": null,
> "after": {
> "id": 76704,
> "roles": [null],
> },
> "source": {
> "version": "1.3.0.Final",
> "connector": "postgresql",
> "name": "db",
> "ts_ms": 1605739197360,
> "snapshot": "true",
> "db": "db",
> "schema": "public",
> "table": "data",
> "txId": 1784,
> "lsn": 1305806608,
> "xmin": null
> },
> "op": "r",
> "ts_ms": 1605739197373,
> "transaction": null
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)