[
https://issues.apache.org/jira/browse/FLINK-20234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jark Wu updated FLINK-20234:
----------------------------
Description:
Report my USER mailing list:
Hi,
I recently discovered some of our data has NULL values arriving in an
ARRAY<STRING> column. This column is being consumed by Flink via the Kafka
connector Debezium format. We seem to be receiving NullPointerExceptions for
when these NULL values in the arrays arrive which restarts the source operator
in a loop.
Is there any way to not throw or to possibly filter out NULLs in an Array of
Strings in Flink?
We're somewhat stuck on how to solve this problem, we'd like to be defensive
about this on Flink's side.
Thanks!
The JSON:
{code:java}
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{ "type": "int32", "optional": false, "field": "id" },
{
"type": "array",
"items": { "type": "string", "optional": true },
"optional": false,
"field": "roles"
},
],
"optional": true,
"name": "db.public.data.Value",
"field": "before"
},
{
"type": "struct",
"fields": [
{ "type": "int32", "optional": false, "field": "id" },
{
"type": "array",
"items": { "type": "string", "optional": true },
"optional": false,
"field": "roles"
},
],
"optional": true,
"name": "db.public.data.Value",
"field": "after"
},
{
"type": "struct",
"fields": [
{ "type": "string", "optional": false, "field": "version" },
{ "type": "string", "optional": false, "field": "connector" },
{ "type": "string", "optional": false, "field": "name" },
{ "type": "int64", "optional": false, "field": "ts_ms" },
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": { "allowed": "true,last,false" },
"default": "false",
"field": "snapshot"
},
{ "type": "string", "optional": false, "field": "db" },
{ "type": "string", "optional": false, "field": "schema" },
{ "type": "string", "optional": false, "field": "table" },
{ "type": "int64", "optional": true, "field": "txId" },
{ "type": "int64", "optional": true, "field": "lsn" },
{ "type": "int64", "optional": true, "field": "xmin" }
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{ "type": "string", "optional": false, "field": "op" },
{ "type": "int64", "optional": true, "field": "ts_ms" },
{
"type": "struct",
"fields": [
{ "type": "string", "optional": false, "field": "id" },
{ "type": "int64", "optional": false, "field": "total_order" },
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"field": "transaction"
}
],
"optional": false,
"name": "db.public.data.Envelope"
},
"payload": {
"before": null,
"after": {
"id": 76704,
"roles": [null],
},
"source": {
"version": "1.3.0.Final",
"connector": "postgresql",
"name": "db",
"ts_ms": 1605739197360,
"snapshot": "true",
"db": "db",
"schema": "public",
"table": "data",
"txId": 1784,
"lsn": 1305806608,
"xmin": null
},
"op": "r",
"ts_ms": 1605739197373,
"transaction": null
}
}
{code}
was:
Report my USER mailing list:
Hi,
I recently discovered some of our data has NULL values arriving in an
ARRAY<STRING> column. This column is being consumed by Flink via the Kafka
connector Debezium format. We seem to be receiving NullPointerExceptions for
when these NULL values in the arrays arrive which restarts the source operator
in a loop.
Is there any way to not throw or to possibly filter out NULLs in an Array of
Strings in Flink?
We're somewhat stuck on how to solve this problem, we'd like to be defensive
about this on Flink's side.
Thanks!
> Json format supports SE/DE null elements of ARRAY type field
> ------------------------------------------------------------
>
> Key: FLINK-20234
> URL: https://issues.apache.org/jira/browse/FLINK-20234
> Project: Flink
> Issue Type: Improvement
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table
> SQL / Ecosystem
> Affects Versions: 1.11.2
> Reporter: Danny Chen
> Priority: Major
> Fix For: 1.12.0
>
>
> Report my USER mailing list:
> Hi,
> I recently discovered some of our data has NULL values arriving in an
> ARRAY<STRING> column. This column is being consumed by Flink via the Kafka
> connector Debezium format. We seem to be receiving NullPointerExceptions for
> when these NULL values in the arrays arrive which restarts the source
> operator in a loop.
> Is there any way to not throw or to possibly filter out NULLs in an Array of
> Strings in Flink?
> We're somewhat stuck on how to solve this problem, we'd like to be defensive
> about this on Flink's side.
> Thanks!
> The JSON:
> {code:java}
> {
> "schema": {
> "type": "struct",
> "fields": [
> {
> "type": "struct",
> "fields": [
> { "type": "int32", "optional": false, "field": "id" },
> {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
> },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "before"
> },
> {
> "type": "struct",
> "fields": [
> { "type": "int32", "optional": false, "field": "id" },
> {
> "type": "array",
> "items": { "type": "string", "optional": true },
> "optional": false,
> "field": "roles"
> },
> ],
> "optional": true,
> "name": "db.public.data.Value",
> "field": "after"
> },
> {
> "type": "struct",
> "fields": [
> { "type": "string", "optional": false, "field": "version" },
> { "type": "string", "optional": false, "field": "connector" },
> { "type": "string", "optional": false, "field": "name" },
> { "type": "int64", "optional": false, "field": "ts_ms" },
> {
> "type": "string",
> "optional": true,
> "name": "io.debezium.data.Enum",
> "version": 1,
> "parameters": { "allowed": "true,last,false" },
> "default": "false",
> "field": "snapshot"
> },
> { "type": "string", "optional": false, "field": "db" },
> { "type": "string", "optional": false, "field": "schema" },
> { "type": "string", "optional": false, "field": "table" },
> { "type": "int64", "optional": true, "field": "txId" },
> { "type": "int64", "optional": true, "field": "lsn" },
> { "type": "int64", "optional": true, "field": "xmin" }
> ],
> "optional": false,
> "name": "io.debezium.connector.postgresql.Source",
> "field": "source"
> },
> { "type": "string", "optional": false, "field": "op" },
> { "type": "int64", "optional": true, "field": "ts_ms" },
> {
> "type": "struct",
> "fields": [
> { "type": "string", "optional": false, "field": "id" },
> { "type": "int64", "optional": false, "field": "total_order" },
> {
> "type": "int64",
> "optional": false,
> "field": "data_collection_order"
> }
> ],
> "optional": true,
> "field": "transaction"
> }
> ],
> "optional": false,
> "name": "db.public.data.Envelope"
> },
> "payload": {
> "before": null,
> "after": {
> "id": 76704,
> "roles": [null],
> },
> "source": {
> "version": "1.3.0.Final",
> "connector": "postgresql",
> "name": "db",
> "ts_ms": 1605739197360,
> "snapshot": "true",
> "db": "db",
> "schema": "public",
> "table": "data",
> "txId": 1784,
> "lsn": 1305806608,
> "xmin": null
> },
> "op": "r",
> "ts_ms": 1605739197373,
> "transaction": null
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)