[
https://issues.apache.org/jira/browse/FLINK-33110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Roman Lupiichuk updated FLINK-33110:
------------------------------------
Description:
After upgrade from Flink 1.14.0 to 1.15.4 a bunch of our tests started to fail.
I've striped down one of the failing test to following (it's in Kotlin)
{code:java}
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.annotation.InputGroup
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.types.Row
import org.junit.jupiter.api.Test
@FunctionHint(output = DataTypeHint("ARRAY<ROW<fieldName STRING NOT NULL>>"))
object TestArrayFunc : ScalarFunction() {
fun eval(@DataTypeHint(inputGroup = InputGroup.ANY) vararg values: Any):
Array<Row> =
values
.map { data ->
val casted = data as Map<String, String?>
Row.of(casted["fieldName"])
}
.toTypedArray()
}
class ArrayFieldTest {
@Test
fun test() {
val tableEnv = TableEnvironment.create(
Configuration().also {
it.setString("table.exec.resource.default-parallelism", "1")
},
)
tableEnv.createTemporarySystemFunction("TO_FIELDS_ARRAY", TestArrayFunc)
val dataId = TestValuesTableFactory.registerData(
listOf(
TestValuesTableFactory.changelogRow(
"+I",
"123"
)
)
)
tableEnv.executeSql(
"""
CREATE TABLE events
(
id STRING
) WITH (
'connector' = 'values',
'data-id' = '$dataId'
)
"""
)
tableEnv.executeSql(
"""
CREATE TABLE results
(
fields ARRAY<ROW<fieldName STRING>>,
event_time TIMESTAMP
) WITH (
'connector' = 'print'
)
"""
)
tableEnv.executeSql(
"""
INSERT INTO results (fields, event_time)
SELECT
TO_FIELDS_ARRAY(
MAP['fieldName', 'foo'],
MAP['fieldName', 'hello']
),
NOW()
FROM events
"""
)
}
}
{code}
In Flink 1.14.0 it produces
{code:java}
+I[[+I[foo], +I[hello]], 2023-09-18T08:18:55.278]{code}
That's correct and expected output.
But in Flink 1.15.4 the output is
{code:java}
+I[[+I[hello], +I[hello]], 2023-09-18T08:21:12.569]{code}
As one can see all elements in the array were replaced with the last element
duplicates.
The issue goes away if I
# either remove NOT NULL constraint from function hint
# or remove TIMESTAMP field from the sink table
There is also no issue in regular Flink cluster, only in MiniCluster which is
used in testing.
was:
After upgrade from Flink 1.14.0 to 1.15.4 a bunch of our tests started to fail.
I've striped down one of the failing test to following (it's in Kotlin)
{code:java}
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.annotation.InputGroup
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.types.Row
import org.junit.jupiter.api.Test
@FunctionHint(output = DataTypeHint("ARRAY<ROW<fieldName STRING NOT NULL>>"))
object TestArrayFunc : ScalarFunction() {
fun eval(@DataTypeHint(inputGroup = InputGroup.ANY) vararg values: Any):
Array<Row> =
values
.map { data ->
val casted = data as Map<String, String?>
Row.of(casted["fieldName"])
}
.toTypedArray()
}
class ArrayFieldTest {
@Test
fun test() {
val tableEnv = TableEnvironment.create(
Configuration().also {
it.setString("table.exec.resource.default-parallelism", "1")
},
)
tableEnv.createTemporarySystemFunction("TO_FIELDS_ARRAY", TestArrayFunc)
val dataId = TestValuesTableFactory.registerData(
listOf(
TestValuesTableFactory.changelogRow(
"+I",
"123"
)
)
)
tableEnv.executeSql(
"""
CREATE TABLE events
(
id STRING
) WITH (
'connector' = 'values',
'data-id' = '$dataId'
)
"""
)
tableEnv.executeSql(
"""
CREATE TABLE results
(
fields ARRAY<ROW<fieldName STRING>>,
event_time TIMESTAMP
) WITH (
'connector' = 'print'
)
"""
)
tableEnv.executeSql(
"""
INSERT INTO results (fields, event_time)
SELECT
TO_FIELDS_ARRAY(
MAP['fieldName', 'foo'],
MAP['fieldName', 'hello']
),
NOW()
FROM events
"""
)
}
}
{code}
In Flink 1.14.0 it produces
{code:java}
+I[[+I[foo], +I[hello]], 2023-09-18T08:18:55.278]{code}
That's correct and expected output.
But in Flink 1.15.4 the output is
{code:java}
+I[[+I[hello], +I[hello]], 2023-09-18T08:21:12.569]{code}
As one can see all elements in the array were replaced with the last element.
The issue goes away if I
# either remove NOT NULL constraint from function hint
# or remove TIMESTAMP field from the sink table
There is also no issue in regular Flink cluster, only in MiniCluster which is
used in testing.
> Array content gets replaced with last element duplicates
> --------------------------------------------------------
>
> Key: FLINK-33110
> URL: https://issues.apache.org/jira/browse/FLINK-33110
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.15.4
> Reporter: Roman Lupiichuk
> Priority: Minor
>
> After upgrade from Flink 1.14.0 to 1.15.4 a bunch of our tests started to
> fail.
> I've striped down one of the failing test to following (it's in Kotlin)
> {code:java}
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.table.annotation.DataTypeHint
> import org.apache.flink.table.annotation.FunctionHint
> import org.apache.flink.table.annotation.InputGroup
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.functions.ScalarFunction
> import org.apache.flink.table.planner.factories.TestValuesTableFactory
> import org.apache.flink.types.Row
> import org.junit.jupiter.api.Test
> @FunctionHint(output = DataTypeHint("ARRAY<ROW<fieldName STRING NOT NULL>>"))
> object TestArrayFunc : ScalarFunction() {
> fun eval(@DataTypeHint(inputGroup = InputGroup.ANY) vararg values: Any):
> Array<Row> =
> values
> .map { data ->
> val casted = data as Map<String, String?>
> Row.of(casted["fieldName"])
> }
> .toTypedArray()
> }
> class ArrayFieldTest {
> @Test
> fun test() {
> val tableEnv = TableEnvironment.create(
> Configuration().also {
> it.setString("table.exec.resource.default-parallelism", "1")
> },
> )
> tableEnv.createTemporarySystemFunction("TO_FIELDS_ARRAY",
> TestArrayFunc)
> val dataId = TestValuesTableFactory.registerData(
> listOf(
> TestValuesTableFactory.changelogRow(
> "+I",
> "123"
> )
> )
> )
> tableEnv.executeSql(
> """
> CREATE TABLE events
> (
> id STRING
> ) WITH (
> 'connector' = 'values',
> 'data-id' = '$dataId'
> )
> """
> )
> tableEnv.executeSql(
> """
> CREATE TABLE results
> (
> fields ARRAY<ROW<fieldName STRING>>,
> event_time TIMESTAMP
> ) WITH (
> 'connector' = 'print'
> )
> """
> )
> tableEnv.executeSql(
> """
> INSERT INTO results (fields, event_time)
> SELECT
> TO_FIELDS_ARRAY(
> MAP['fieldName', 'foo'],
> MAP['fieldName', 'hello']
> ),
> NOW()
> FROM events
> """
> )
> }
> }
> {code}
> In Flink 1.14.0 it produces
> {code:java}
> +I[[+I[foo], +I[hello]], 2023-09-18T08:18:55.278]{code}
> That's correct and expected output.
> But in Flink 1.15.4 the output is
> {code:java}
> +I[[+I[hello], +I[hello]], 2023-09-18T08:21:12.569]{code}
> As one can see all elements in the array were replaced with the last element
> duplicates.
> The issue goes away if I
> # either remove NOT NULL constraint from function hint
> # or remove TIMESTAMP field from the sink table
> There is also no issue in regular Flink cluster, only in MiniCluster which is
> used in testing.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)