[
https://issues.apache.org/jira/browse/FLINK-28591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Andrzej Swatowski updated FLINK-28591:
--------------------------------------
Description:
When using Table API to insert data into array of rows, the data apparently is
incorrectly serialized internally, which leads to incorrect serialization at
the connectors. It happens when one of the table fields is a BIGINT (and does
not happen, when it is INT).
E.g., a following table:
{code:java}
CREATE TABLE wrongArray (
foo bigint,
bar ARRAY<ROW<`foo1` STRING, `foo2` STRING>>
) WITH (
'connector' = 'filesystem',
'path' = 'file://path/to/somewhere',
'format' = 'json'
) {code}
along with the following insert:
{code:java}
insert into wrongArray (
SELECT
1,
array[
('Field1', 'Value1'),
('Field2', 'Value2')
]
FROM (VALUES(1))
) {code}
gets serialized into:
{code:java}
{
"foo":1,
"bar":[
{
"foo1":"Field2",
"foo2":"Value2"
},
{
"foo1":"Field2",
"foo2":"Value2"
}
]
}{code}
It is easy to spot that `bar` (an Array of Rows with two Strings) consists of
duplicates of the last row in the array.
On the other hand, when `foo` is of type `int` instead of `bigint`:
{code:java}
CREATE TABLE wrongArray (
foo int,
bar ARRAY<ROW<`foo1` STRING, `foo2` STRING>>
) WITH (
'connector' = 'filesystem',
'path' = 'file://path/to/somewhere',
'format' = 'json'
) {code}
the previous insert yields correct value:
{code:java}
{
"foo":1,
"bar":[
{
"foo1":"Field1",
"foo2":"Value1"
},
{
"foo1":"Field2",
"foo2":"Value2"
}
]
}{code}
Bug reproduced in the Flink project:
[https://github.com/swtwsk/flink-array-row-bug]
----
It is not an error connected with either a specific connector or format. I have
done a bit of debugging when trying to implement my own format and it seems
that `BinaryArrayData` holding the row values has wrong data saved in its
`MemorySegment`, i.e. calling:
{code:java}
for (var i = 0; i < array.size(); i++) {
Object element = arrayDataElementGetter.getElementOrNull(array, i);
}{code}
correctly calculates offsets but yields the same result as the data is
malformed in the array's `MemorySegment`. Such a call can be, e.g., found in
`flink-json` — to be more specific in
{color:#e8912d}org.apache.flink.formats.json.RowDataToJsonConverters::createArrayConverter
{color}(line 241 in 1.15.0 version)
was:
When using Table API to insert data into array of rows, the data apparently is
incorrectly serialized internally, which leads to incorrect serialization at
the connectors.
E.g., a following table:
{code:java}
CREATE TABLE wrongArray (
foo bigint,
bar ARRAY<ROW<`foo1` STRING, `foo2` STRING>>,
strings ARRAY<STRING>,
intRows ARRAY<ROW<`a` INT, `b` INT>>
) WITH (
'connector' = 'filesystem',
'path' = 'file://path/to/somewhere',
'format' = 'json'
) {code}
along with the following insert:
{code:java}
insert into wrongArray (
SELECT
1,
array[
('Field1', 'Value1'),
('Field2', 'Value2')
],
array['foo', 'bar', 'foobar'],
array[ROW(1, 1), ROW(2, 2)]
FROM (VALUES(1))
) {code}
gets serialized into:
{code:java}
{
"foo":1,
"bar":[
{
"foo1":"Field2",
"foo2":"Value2"
},
{
"foo1":"Field2",
"foo2":"Value2"
}
],
"strings":[
"foo",
"bar",
"foobar"
],
"intRows":[
{
"a":2,
"b":2
},
{
"a":2,
"b":2
}
]
}{code}
It is easy to spot that `strings` (being an Array of String) yields the correct
values. However, both `bar` (an Array of Rows with two Strings) and `intRows`
(an Array of Rows with two Integers) consists of duplicates of the last row in
the array.
Bug reproduced in the Flink project:
[https://github.com/swtwsk/flink-array-row-bug]
----
It is not an error connected with either a specific connector or format. I have
done a bit of debugging when trying to implement my own format and it seems
that `BinaryArrayData` holding the row values has wrong data saved in its
`MemorySegment`, i.e. calling:
{code:java}
for (var i = 0; i < array.size(); i++) {
Object element = arrayDataElementGetter.getElementOrNull(array, i);
}{code}
correctly calculates offsets but yields the same result as the data is
malformed in the array's `MemorySegment`. Such a call can be, e.g., found in
`flink-json` — to be more specific in
{color:#e8912d}org.apache.flink.formats.json.RowDataToJsonConverters::createArrayConverter
{color}(line 241 in 1.15.0 version)
> Array<Row<...>> is not serialized correctly when BigInt is present
> ------------------------------------------------------------------
>
> Key: FLINK-28591
> URL: https://issues.apache.org/jira/browse/FLINK-28591
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API, Table SQL / Planner
> Affects Versions: 1.15.0
> Reporter: Andrzej Swatowski
> Priority: Major
>
> When using Table API to insert data into array of rows, the data apparently
> is incorrectly serialized internally, which leads to incorrect serialization
> at the connectors. It happens when one of the table fields is a BIGINT (and
> does not happen, when it is INT).
> E.g., a following table:
> {code:java}
> CREATE TABLE wrongArray (
> foo bigint,
> bar ARRAY<ROW<`foo1` STRING, `foo2` STRING>>
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'file://path/to/somewhere',
> 'format' = 'json'
> ) {code}
> along with the following insert:
> {code:java}
> insert into wrongArray (
> SELECT
> 1,
> array[
> ('Field1', 'Value1'),
> ('Field2', 'Value2')
> ]
> FROM (VALUES(1))
> ) {code}
> gets serialized into:
> {code:java}
> {
> "foo":1,
> "bar":[
> {
> "foo1":"Field2",
> "foo2":"Value2"
> },
> {
> "foo1":"Field2",
> "foo2":"Value2"
> }
> ]
> }{code}
> It is easy to spot that `bar` (an Array of Rows with two Strings) consists of
> duplicates of the last row in the array.
> On the other hand, when `foo` is of type `int` instead of `bigint`:
> {code:java}
> CREATE TABLE wrongArray (
> foo int,
> bar ARRAY<ROW<`foo1` STRING, `foo2` STRING>>
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'file://path/to/somewhere',
> 'format' = 'json'
> ) {code}
> the previous insert yields correct value:
> {code:java}
> {
> "foo":1,
> "bar":[
> {
> "foo1":"Field1",
> "foo2":"Value1"
> },
> {
> "foo1":"Field2",
> "foo2":"Value2"
> }
> ]
> }{code}
> Bug reproduced in the Flink project:
> [https://github.com/swtwsk/flink-array-row-bug]
> ----
> It is not an error connected with either a specific connector or format. I
> have done a bit of debugging when trying to implement my own format and it
> seems that `BinaryArrayData` holding the row values has wrong data saved in
> its `MemorySegment`, i.e. calling:
> {code:java}
> for (var i = 0; i < array.size(); i++) {
> Object element = arrayDataElementGetter.getElementOrNull(array, i);
> }{code}
> correctly calculates offsets but yields the same result as the data is
> malformed in the array's `MemorySegment`. Such a call can be, e.g., found in
> `flink-json` — to be more specific in
> {color:#e8912d}org.apache.flink.formats.json.RowDataToJsonConverters::createArrayConverter
> {color}(line 241 in 1.15.0 version)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)