[ 
https://issues.apache.org/jira/browse/FLINK-33110?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Roman Lupiichuk updated FLINK-33110:
------------------------------------
    Description: 
After upgrade from Flink 1.14.0 to 1.15.4 a bunch of our tests started to fail.

I've striped down one of the failing test to following (it's in Kotlin)
{code:java}
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.annotation.InputGroup
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.types.Row
import org.junit.jupiter.api.Test

@FunctionHint(output = DataTypeHint("ARRAY<ROW<fieldName STRING NOT NULL>>"))
object TestArrayFunc : ScalarFunction() {
    fun eval(@DataTypeHint(inputGroup = InputGroup.ANY) vararg values: Any): 
Array<Row> =
        values
            .map { data ->
                val casted = data as Map<String, String?>
                Row.of(casted["fieldName"])
            }
            .toTypedArray()
}

class ArrayFieldTest {
    @Test
    fun test() {
        val tableEnv = TableEnvironment.create(
            Configuration().also {
                it.setString("table.exec.resource.default-parallelism", "1")
            },
        )
        tableEnv.createTemporarySystemFunction("TO_FIELDS_ARRAY", TestArrayFunc)

        val dataId = TestValuesTableFactory.registerData(
            listOf(
                TestValuesTableFactory.changelogRow(
                    "+I",
                    "123"
                )
            )
        )
        tableEnv.executeSql(
            """
                CREATE TABLE events
                (
                    id STRING
                ) WITH (
                    'connector' = 'values',
                    'data-id' = '$dataId'
                )
            """
        )
        tableEnv.executeSql(
            """
                CREATE TABLE results
                (
                    fields ARRAY<ROW<fieldName STRING>>,
                    event_time TIMESTAMP
                ) WITH (
                    'connector' = 'print'
                )
            """
        )

        tableEnv.executeSql(
            """
                INSERT INTO results (fields, event_time)
                SELECT
                    TO_FIELDS_ARRAY(
                       MAP['fieldName', 'foo'],
                       MAP['fieldName', 'hello']
                    ),
                    NOW()
                FROM events
            """
        )
    }
}
 {code}
In Flink 1.14.0 it produces
{code:java}
+I[[+I[foo], +I[hello]], 2023-09-18T08:18:55.278]{code}
That's correct and expected output.

But in Flink 1.15.4 the output is
{code:java}
+I[[+I[hello], +I[hello]], 2023-09-18T08:21:12.569]{code}
As one can see all elements in the array were replaced with the last element 
duplicates.

The issue goes away if I
 # either remove NOT NULL constraint from function hint
 # or remove TIMESTAMP field from the sink table

There is also no issue in regular Flink cluster, only in MiniCluster which is 
used in testing.

  was:
After upgrade from Flink 1.14.0 to 1.15.4 a bunch of our tests started to fail.

I've striped down one of the failing test to following (it's in Kotlin)
{code:java}
import org.apache.flink.configuration.Configuration
import org.apache.flink.table.annotation.DataTypeHint
import org.apache.flink.table.annotation.FunctionHint
import org.apache.flink.table.annotation.InputGroup
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.planner.factories.TestValuesTableFactory
import org.apache.flink.types.Row
import org.junit.jupiter.api.Test

@FunctionHint(output = DataTypeHint("ARRAY<ROW<fieldName STRING NOT NULL>>"))
object TestArrayFunc : ScalarFunction() {
    fun eval(@DataTypeHint(inputGroup = InputGroup.ANY) vararg values: Any): 
Array<Row> =
        values
            .map { data ->
                val casted = data as Map<String, String?>
                Row.of(casted["fieldName"])
            }
            .toTypedArray()
}

class ArrayFieldTest {
    @Test
    fun test() {
        val tableEnv = TableEnvironment.create(
            Configuration().also {
                it.setString("table.exec.resource.default-parallelism", "1")
            },
        )
        tableEnv.createTemporarySystemFunction("TO_FIELDS_ARRAY", TestArrayFunc)

        val dataId = TestValuesTableFactory.registerData(
            listOf(
                TestValuesTableFactory.changelogRow(
                    "+I",
                    "123"
                )
            )
        )
        tableEnv.executeSql(
            """
                CREATE TABLE events
                (
                    id STRING
                ) WITH (
                    'connector' = 'values',
                    'data-id' = '$dataId'
                )
            """
        )
        tableEnv.executeSql(
            """
                CREATE TABLE results
                (
                    fields ARRAY<ROW<fieldName STRING>>,
                    event_time TIMESTAMP
                ) WITH (
                    'connector' = 'print'
                )
            """
        )

        tableEnv.executeSql(
            """
                INSERT INTO results (fields, event_time)
                SELECT
                    TO_FIELDS_ARRAY(
                       MAP['fieldName', 'foo'],
                       MAP['fieldName', 'hello']
                    ),
                    NOW()
                FROM events
            """
        )
    }
}
 {code}
In Flink 1.14.0 it produces
{code:java}
+I[[+I[foo], +I[hello]], 2023-09-18T08:18:55.278]{code}
That's correct and expected output.

But in Flink 1.15.4 the output is
{code:java}
+I[[+I[hello], +I[hello]], 2023-09-18T08:21:12.569]{code}
As one can see all elements in the array were replaced with the last element.

The issue goes away if I
 # either remove NOT NULL constraint from function hint
 # or remove TIMESTAMP field from the sink table

There is also no issue in regular Flink cluster, only in MiniCluster which is 
used in testing.


> Array content gets replaced with last element duplicates
> --------------------------------------------------------
>
>                 Key: FLINK-33110
>                 URL: https://issues.apache.org/jira/browse/FLINK-33110
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.15.4
>            Reporter: Roman Lupiichuk
>            Priority: Minor
>
> After upgrade from Flink 1.14.0 to 1.15.4 a bunch of our tests started to 
> fail.
> I've striped down one of the failing test to following (it's in Kotlin)
> {code:java}
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.table.annotation.DataTypeHint
> import org.apache.flink.table.annotation.FunctionHint
> import org.apache.flink.table.annotation.InputGroup
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.functions.ScalarFunction
> import org.apache.flink.table.planner.factories.TestValuesTableFactory
> import org.apache.flink.types.Row
> import org.junit.jupiter.api.Test
> @FunctionHint(output = DataTypeHint("ARRAY<ROW<fieldName STRING NOT NULL>>"))
> object TestArrayFunc : ScalarFunction() {
>     fun eval(@DataTypeHint(inputGroup = InputGroup.ANY) vararg values: Any): 
> Array<Row> =
>         values
>             .map { data ->
>                 val casted = data as Map<String, String?>
>                 Row.of(casted["fieldName"])
>             }
>             .toTypedArray()
> }
> class ArrayFieldTest {
>     @Test
>     fun test() {
>         val tableEnv = TableEnvironment.create(
>             Configuration().also {
>                 it.setString("table.exec.resource.default-parallelism", "1")
>             },
>         )
>         tableEnv.createTemporarySystemFunction("TO_FIELDS_ARRAY", 
> TestArrayFunc)
>         val dataId = TestValuesTableFactory.registerData(
>             listOf(
>                 TestValuesTableFactory.changelogRow(
>                     "+I",
>                     "123"
>                 )
>             )
>         )
>         tableEnv.executeSql(
>             """
>                 CREATE TABLE events
>                 (
>                     id STRING
>                 ) WITH (
>                     'connector' = 'values',
>                     'data-id' = '$dataId'
>                 )
>             """
>         )
>         tableEnv.executeSql(
>             """
>                 CREATE TABLE results
>                 (
>                     fields ARRAY<ROW<fieldName STRING>>,
>                     event_time TIMESTAMP
>                 ) WITH (
>                     'connector' = 'print'
>                 )
>             """
>         )
>         tableEnv.executeSql(
>             """
>                 INSERT INTO results (fields, event_time)
>                 SELECT
>                     TO_FIELDS_ARRAY(
>                        MAP['fieldName', 'foo'],
>                        MAP['fieldName', 'hello']
>                     ),
>                     NOW()
>                 FROM events
>             """
>         )
>     }
> }
>  {code}
> In Flink 1.14.0 it produces
> {code:java}
> +I[[+I[foo], +I[hello]], 2023-09-18T08:18:55.278]{code}
> That's correct and expected output.
> But in Flink 1.15.4 the output is
> {code:java}
> +I[[+I[hello], +I[hello]], 2023-09-18T08:21:12.569]{code}
> As one can see all elements in the array were replaced with the last element 
> duplicates.
> The issue goes away if I
>  # either remove NOT NULL constraint from function hint
>  # or remove TIMESTAMP field from the sink table
> There is also no issue in regular Flink cluster, only in MiniCluster which is 
> used in testing.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to