[
https://issues.apache.org/jira/browse/FLINK-35053?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pietro updated FLINK-35053:
---------------------------
Description:
The JDBC sink for Postgres does not support {{{}TIMESTAMP WITH TIME ZONE{}}},
nor {{TIMESTAMP_LTZ}} types.
Related issues: FLINK-22199, FLINK-20869
h2. Problem Explanation
A Postgres {{target_table}} has a field {{tm_tz}} of type {{timestamptz}} .
{code:sql}
-- Postgres DDL
CREATE TABLE target_table (
tm_tz TIMESTAMP WITH TIME ZONE
)
{code}
In Flink we have a table with a column of type {{{}TIMESTAMP_LTZ(6){}}}, and
our goal is to sink it to {{{}target_table{}}}.
{code:sql}
-- Flink DDL
CREATE TABLE sink (
tm_tz TIMESTAMP_LTZ(6)
) WITH (
'connector' = 'jdbc',
'table-name' = 'target_table'
...
)
{code}
According to
[AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109],
{{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is supported, while
{{TIMESTAMP_WITH_TIME_ZONE}} is not.
However, when the converter is created via
[AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246],
it throws an {{UnsupportedOperationException}} since
{{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is *not* among the available types, while
[{{TIMESTAMP_WITH_TIME_ZONE}}|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L168]
is.
{code:java}
Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported
type:TIMESTAMP_LTZ(6)
at
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186)
at
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99)
at
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58)
at
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118)
at
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68)
at
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.<init>(PostgresRowConverter.java:47)
at
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51)
at
org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184)
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478)
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205)
at
org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69)
at
org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
at
org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:73)
at
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:272)
at
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:262)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
{code}
h3. Using TIMESTAMP WITH TIME ZONE
Defining {{tm_tz}} in Flink as {{TIMESTAMP(6) WITH TIME ZONE}} instead of
{{TIMESTAMP_LTZ(6)}} does not solve the issue, and returns the following error
instead:
{code:java}
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL
parse failed. Encountered "TIME" at line 1, column 66.
Was expecting:
"LOCAL" ...
at
org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:81)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:102)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
{code}
h3. Using TIMESTAMP
Defining {{tm_tz}} in Flink as {{TIMESTAMP(6)}} can lead to potentially
incorrect time zone conversions.
For instance, assume that the local time is GMT+2 and we have a row in Flink
with {{tm_tz}} equal to {{'2024-04-01 00:00:00'}} (UTC). When the
{{toTimestamp()}} method
([reference|https://github.com/apache/flink-connector-jdbc/blob/ab5d6159141bdbe8aed78e24c9500a136efbfac0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L251])
used by {{AbstractJdbcRowConverter.createExternalConverter()}} is invoked
!createExternalConverter.png|width=80!
it adds the local timezone to it, instead of "+00":
!TimestampData.png|width=80!
!Timestamp.png|width=80!
Postgres will therefore receive {{'2024-04-01 00:00:00+02'}} (instead of +00)
and will convert it to {{'2024-03-31 22:00:00+00'.}}
h2. Possible Solutions
# Make the JDBC connector support TIMESTAMP_LTZ by adding a proper converter
to
[AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246]
+ remove TIMESTAMP_WITH_TIME_ZONE from the declared supported types in
[AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109]
# Remove
was:
The JDBC sink for Postgres does not support {{{}TIMESTAMP WITH TIME ZONE{}}},
nor {{TIMESTAMP_LTZ}} types.
Related issues: FLINK-22199, FLINK-20869
h2. Problem Explanation
A Postgres {{target_table}} has a field {{tm_tz}} of type {{timestamptz}} .
{code:sql}
-- Postgres DDL
CREATE TABLE target_table (
tm_tz TIMESTAMP WITH TIME ZONE
)
{code}
In Flink we have a table with a column of type {{{}TIMESTAMP_LTZ(6){}}}, and
our goal is to sink it to {{{}target_table{}}}.
{code:sql}
-- Flink DDL
CREATE TABLE sink (
tm_tz TIMESTAMP_LTZ(6)
) WITH (
'connector' = 'jdbc',
'table-name' = 'target_table'
...
)
{code}
According to
[AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109],
{{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is supported, while
{{TIMESTAMP_WITH_TIME_ZONE}} is not.
However, when the converter is created via
[AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246],
it throws an {{UnsupportedOperationException}} since
{{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is *not* among the available types, while
[{{TIMESTAMP_WITH_TIME_ZONE}}|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L168]
is.
{code:java}
Exception in thread "main" java.lang.UnsupportedOperationException: Unsupported
type:TIMESTAMP_LTZ(6)
at
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186)
at
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99)
at
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58)
at
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118)
at
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68)
at
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.<init>(PostgresRowConverter.java:47)
at
org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51)
at
org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184)
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478)
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
at
org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
at
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848)
at
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229)
at
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205)
at
org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69)
at
org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
at
org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:73)
at
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:272)
at
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:262)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
{code}
h3. Using TIMESTAMP WITH TIME ZONE
Defining {{tm_tz}} in Flink as {{TIMESTAMP(6) WITH TIME ZONE}} instead of
{{TIMESTAMP_LTZ(6)}} does not solve the issue, and returns the following error
instead:
{code:java}
Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL
parse failed. Encountered "TIME" at line 1, column 66.
Was expecting:
"LOCAL" ...
at
org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:81)
at
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:102)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
{code}
h3. Using TIMESTAMP
Defining {{tm_tz}} in Flink as {{TIMESTAMP(6)}} can lead to potentially
incorrect time zone conversions.
For instance, assume that the local time is GMT+2 and we have a row in Flink
with {{tm_tz}} equal to {{'2024-04-01 00:00:00'}} (UTC). When the
{{toTimestamp()}} method
([reference|https://github.com/apache/flink-connector-jdbc/blob/ab5d6159141bdbe8aed78e24c9500a136efbfac0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L251])
used by {{AbstractJdbcRowConverter.createExternalConverter()}} is invoked
!createExternalConverter.png|width=80%!
it adds the local timezone to it, instead of "+00":
!TimestampData.png|width=80%!
!Timestamp.png|width=80%!
Postgres will therefore receive {{'2024-04-01 00:00:00+02'}} (instead of +00)
and will convert it to {{'2024-03-31 22:00:00+00'.}}
> TIMESTAMP with TIME ZONE not supported by JDBC connector for Postgres
> ---------------------------------------------------------------------
>
> Key: FLINK-35053
> URL: https://issues.apache.org/jira/browse/FLINK-35053
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.19.0, 1.18.1, jdbc-3.1.2
> Reporter: Pietro
> Priority: Major
> Attachments: Timestamp.png, TimestampData.png,
> createExternalConverter.png
>
>
> The JDBC sink for Postgres does not support {{{}TIMESTAMP WITH TIME ZONE{}}},
> nor {{TIMESTAMP_LTZ}} types.
> Related issues: FLINK-22199, FLINK-20869
> h2. Problem Explanation
> A Postgres {{target_table}} has a field {{tm_tz}} of type {{timestamptz}} .
> {code:sql}
> -- Postgres DDL
> CREATE TABLE target_table (
> tm_tz TIMESTAMP WITH TIME ZONE
> )
> {code}
> In Flink we have a table with a column of type {{{}TIMESTAMP_LTZ(6){}}}, and
> our goal is to sink it to {{{}target_table{}}}.
> {code:sql}
> -- Flink DDL
> CREATE TABLE sink (
> tm_tz TIMESTAMP_LTZ(6)
> ) WITH (
> 'connector' = 'jdbc',
> 'table-name' = 'target_table'
> ...
> )
> {code}
> According to
> [AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109],
> {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is supported, while
> {{TIMESTAMP_WITH_TIME_ZONE}} is not.
> However, when the converter is created via
> [AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246],
> it throws an {{UnsupportedOperationException}} since
> {{TIMESTAMP_WITH_LOCAL_TIME_ZONE}} is *not* among the available types, while
> [{{TIMESTAMP_WITH_TIME_ZONE}}|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L168]
> is.
> {code:java}
> Exception in thread "main" java.lang.UnsupportedOperationException:
> Unsupported type:TIMESTAMP_LTZ(6)
> at
> org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createInternalConverter(AbstractJdbcRowConverter.java:186)
> at
> org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createPrimitiveConverter(PostgresRowConverter.java:99)
> at
> org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.createInternalConverter(PostgresRowConverter.java:58)
> at
> org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.createNullableInternalConverter(AbstractJdbcRowConverter.java:118)
> at
> org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter.<init>(AbstractJdbcRowConverter.java:68)
> at
> org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresRowConverter.<init>(PostgresRowConverter.java:47)
> at
> org.apache.flink.connector.jdbc.databases.postgres.dialect.PostgresDialect.getRowConverter(PostgresDialect.java:51)
> at
> org.apache.flink.connector.jdbc.table.JdbcDynamicTableSource.getScanRuntimeProvider(JdbcDynamicTableSource.java:184)
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:478)
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
> at
> org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:4002)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2872)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2432)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2346)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2291)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:728)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:714)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3848)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:618)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:229)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:205)
> at
> org.apache.flink.table.planner.operations.SqlNodeConvertContext.toRelRoot(SqlNodeConvertContext.java:69)
> at
> org.apache.flink.table.planner.operations.converters.SqlQueryConverter.convertSqlNode(SqlQueryConverter.java:48)
> at
> org.apache.flink.table.planner.operations.converters.SqlNodeConverters.convertSqlNode(SqlNodeConverters.java:73)
> at
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convertValidatedSqlNode(SqlNodeToOperationConversion.java:272)
> at
> org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:262)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
> {code}
> h3. Using TIMESTAMP WITH TIME ZONE
> Defining {{tm_tz}} in Flink as {{TIMESTAMP(6) WITH TIME ZONE}} instead of
> {{TIMESTAMP_LTZ(6)}} does not solve the issue, and returns the following
> error instead:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.SqlParserException: SQL
> parse failed. Encountered "TIME" at line 1, column 66.
> Was expecting:
> "LOCAL" ...
>
> at
> org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:81)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:102)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
> {code}
> h3. Using TIMESTAMP
> Defining {{tm_tz}} in Flink as {{TIMESTAMP(6)}} can lead to potentially
> incorrect time zone conversions.
> For instance, assume that the local time is GMT+2 and we have a row in Flink
> with {{tm_tz}} equal to {{'2024-04-01 00:00:00'}} (UTC). When the
> {{toTimestamp()}} method
> ([reference|https://github.com/apache/flink-connector-jdbc/blob/ab5d6159141bdbe8aed78e24c9500a136efbfac0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L251])
> used by {{AbstractJdbcRowConverter.createExternalConverter()}} is invoked
> !createExternalConverter.png|width=80!
> it adds the local timezone to it, instead of "+00":
> !TimestampData.png|width=80!
> !Timestamp.png|width=80!
> Postgres will therefore receive {{'2024-04-01 00:00:00+02'}} (instead of +00)
> and will convert it to {{'2024-03-31 22:00:00+00'.}}
> h2. Possible Solutions
> # Make the JDBC connector support TIMESTAMP_LTZ by adding a proper converter
> to
> [AbstractJdbcRowConverter.externalConverter()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/converter/AbstractJdbcRowConverter.java#L246]
> + remove TIMESTAMP_WITH_TIME_ZONE from the declared supported types in
> [AbstractPostgresCompatibleDialect.supportedTypes()|https://github.com/apache/flink-connector-jdbc/blob/7025642d88ff661e486745b23569595e1813a1d0/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L109]
> # Remove
--
This message was sent by Atlassian Jira
(v8.20.10#820010)