[
https://issues.apache.org/jira/browse/FLINK-25077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sergey Nuyanzin updated FLINK-25077:
------------------------------------
Description:
On Postgres
{code:sql}
CREATE TABLE sal_emp (
name VARCHAR,
pay_by_quarter INT[],
schedule VARCHAR[][]
);
INSERT INTO sal_emp VALUES ('test', ARRAY[1], ARRAY[ARRAY['nested']]);
{code}
on Flink
{code:sql}
CREATE TABLE flink_sal_emp (
name STRING,
pay_by_quarter ARRAY<INT>,
schedule ARRAY<ARRAY<STRING>>
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/postgres',
'table-name' = 'sal_emp',
'username' = 'postgres',
'password' = 'postgres'
);
SELECT * FROM default_catalog.default_database.flink_sal_emp ;
{code}
result
{noformat}
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: class [Ljava.lang.String; cannot be cast to class
org.postgresql.jdbc.PgArray ([Ljava.lang.String; is in module java.base of
loader 'bootstrap'; org.postgresql.jdbc.PgArray is in unnamed module of loader
'app')
at
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$4f4cdb95$2(PostgresRowConverter.java:104)
at
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127)
at
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$4f4cdb95$2(PostgresRowConverter.java:108)
at
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127)
at
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:78)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:330)
{noformat}
was:
On Postgres
{code:sql}
create table sal_emp (
name varchar,
pay_by_quarter int[],
schedule varchar[][]
);
insert into sal_emp values ('test', array[1], array[array['nested']]);
{code}
on Flink
{code:sql}
CREATE TABLE flink_sal_emp (
name string,
pay_by_quarter array<INT>,
schedule array<array<string>>
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/postgres',
'table-name' = 'sal_emp',
'username' = 'postgres',
'password' = 'postgres'
);
select * from default_catalog.default_database.flink_sal_emp ;
{code}
result
{noformat}
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassCastException: class [Ljava.lang.String; cannot be cast to class
org.postgresql.jdbc.PgArray ([Ljava.lang.String; is in module java.base of
loader 'bootstrap'; org.postgresql.jdbc.PgArray is in unnamed module of loader
'app')
at
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$4f4cdb95$2(PostgresRowConverter.java:104)
at
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127)
at
org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$4f4cdb95$2(PostgresRowConverter.java:108)
at
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127)
at
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:78)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257)
at
org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56)
at
org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:330)
{noformat}
> Postgresql connector fails in case column with nested arrays
> ------------------------------------------------------------
>
> Key: FLINK-25077
> URL: https://issues.apache.org/jira/browse/FLINK-25077
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Reporter: Sergey Nuyanzin
> Priority: Major
>
> On Postgres
> {code:sql}
> CREATE TABLE sal_emp (
> name VARCHAR,
> pay_by_quarter INT[],
> schedule VARCHAR[][]
> );
> INSERT INTO sal_emp VALUES ('test', ARRAY[1], ARRAY[ARRAY['nested']]);
> {code}
> on Flink
> {code:sql}
> CREATE TABLE flink_sal_emp (
> name STRING,
> pay_by_quarter ARRAY<INT>,
> schedule ARRAY<ARRAY<STRING>>
> ) WITH (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:postgresql://localhost:5432/postgres',
> 'table-name' = 'sal_emp',
> 'username' = 'postgres',
> 'password' = 'postgres'
> );
> SELECT * FROM default_catalog.default_database.flink_sal_emp ;
> {code}
> result
> {noformat}
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.ClassCastException: class [Ljava.lang.String; cannot be cast to
> class org.postgresql.jdbc.PgArray ([Ljava.lang.String; is in module java.base
> of loader 'bootstrap'; org.postgresql.jdbc.PgArray is in unnamed module of
> loader 'app')
> at
> org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$4f4cdb95$2(PostgresRowConverter.java:104)
> at
> org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127)
> at
> org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$4f4cdb95$2(PostgresRowConverter.java:108)
> at
> org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.lambda$wrapIntoNullableInternalConverter$ea5b8348$1(AbstractJdbcRowConverter.java:127)
> at
> org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.toInternal(AbstractJdbcRowConverter.java:78)
> at
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:257)
> at
> org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat.nextRecord(JdbcRowDataInputFormat.java:56)
> at
> org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:330)
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)