[
https://issues.apache.org/jira/browse/FLINK-35907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
jiangcheng updated FLINK-35907:
-------------------------------
Environment:
I use the following maven settings.
{code:java}
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>3.0.1</version>
</dependency> {code}
PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error occurs
regardless of PG Version.
JDK: Java 8.
was:
I use the following maven settings.
{code:java}
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
</dependency> {code}
PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error occurs
regardless of PG Version.
JDK: Java 8.
> Inconsistent handling of bit and bit(n) types in Flink CDC PostgreSQL
> Connector
> -------------------------------------------------------------------------------
>
> Key: FLINK-35907
> URL: https://issues.apache.org/jira/browse/FLINK-35907
> Project: Flink
> Issue Type: Bug
> Environment: I use the following maven settings.
> {code:java}
> <dependency>
> <groupId>com.ververica</groupId>
> <artifactId>flink-connector-postgres-cdc</artifactId>
> <version>3.0.1</version>
> </dependency> {code}
> PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error
> occurs regardless of PG Version.
> JDK: Java 8.
> Reporter: jiangcheng
> Priority: Major
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> I am encountering issues with the Flink CDC PostgreSQL Connector's handling
> of {{bit}} and {{bit(n, where n>1)}} data types, which seem to deviate from
> expected behavior, leading to inconsistencies and errors during both snapshot
> and incremental sync phases.
> h1. Problems
> h2. *Problem 1: Misinterpretation of {{bit}} and {{bit(1)}}*
> During data retrieval, {{bit}} and {{bit(1)}} types are interpreted as
> {{{}org.apache.kafka.connect.data.Schema.BOOLEAN{}}}, mirroring the treatment
> of PostgreSQL's native {{bool}} or {{boolean}} types. This may lead to loss
> of precision when the actual intention could be to preserve the bit pattern,
> rather than a simple true/false value.
> h2. *Problem 2: Error with {{{}bit{}}}{{{}(n, where n>1){}}} during Snapshot
> Phase*
> For {{{}bit(n, where n>1){}}}, an error is encountered during the snapshot
> phase within {{{}PostgresScanFetchTask{}}}. The issue arises from attempting
> to read these values using {{{}PgResult#getBoolean{}}}, which is
> inappropriate for {{bit(n, where n>1)}} types not representing a standard
> boolean state. Strangely, this error does not surface during the incremental
> phase, because no {{PgResult#getBoolean}} is invoked during the incremental
> phase.
> h1. My Analysis
> h2. BIT type is interpreted to BOOLEAN
> Diving into the code reveals that for both scenarios, the connector relies on
> {{PgResult#getObject}} which internally identifies {{bit}} and {{bit(n, where
> n>1)}} as {{{}java.sql.Type.BOOLEAN{}}}. This misclassification triggers the
> problematic usage of {{getBoolean}} for non-standard boolean representations
> like {{{}bit(n, where n>1){}}}.
> h2.
> h2. Inconsistency between Snapshot and Incremental Phases
> The discrepancy between the snapshot phase and incremental phase is
> noteworthy. During the snapshot phase, errors manifest due to direct
> interaction with {{PgResult#getObject}} in
> {{{}PostgresScanFetchTask#createDataEventsForTable{}}}, and
> {{PgResult#getObject}} is further forward to {{{}PgResult#getBoolean{}}}.
> Conversely, in the incremental phase, {{bit(n, where n>1)}} values are
> coerced into {{{}org.apache.kafka.connect.data.Schema.BYTES{}}}, resulting in
> a loss of the original {{n}} precision information. This forces consumers to
> assume an 8-bit byte array representation, obscuring the intended bit-width
> and potentially leading to incorrect interpretations (e.g., I insert a
> {{bit(10)}} value into PostgreSQL Server named {{{}'0001111111'{}}}, which is
> represented as a byte array of length = 1 in SourceRecord, the first element
> is 127).
> h1. *My Opinion*
> From my perspective, the following approaches may solve this problem.
> *Consistent Handling:* The connector should uniformly and accurately handle
> all {{bit}} variants, respecting their distinct PostgreSQL definitions.
> *Preserve Precision:* For {{{}bit(n, where n>1){}}}, ensure that the
> precision {{n}} is maintained throughout processing, allowing consumers to
> correctly interpret the intended bit sequence without assuming a default byte
> size.
> *Schema Transparency:* Enhance metadata handling to communicate the original
> {{bit(n, where n>1)}} schema accurately to downstream systems, enabling them
> to process the data with full knowledge of its intended format.
> h1. Conclusion
> Addressing these discrepancies will not only improve the reliability of the
> Flink CDC PostgreSQL Connector but also enhance its compatibility with a
> broader range of use cases that rely on precise {{bit}} data handling. I look
> forward to a resolution that ensures consistent and accurate processing
> across both snapshot and incremental modes.
> Thank you for considering this issue, and I'm available to provide further
> details or assist in any way possible.
> h1. Reproduct the Error
> I use the following code to read record from PostgreSQL Connector by
> implementing the deserialize method in
> {{{}DebeziumDeserializationSchema<Record>{}}}:
> {code:java}
> public class PostgreSqlRecordSourceDeserializeSchema
> implements DebeziumDeserializationSchema<Record> {
>
> public void deserialize(SourceRecord sourceRecord, Collector<Record> out)
> throws Exception {
> // skipping irrelevant business logic ...
> Struct rowValue = ((Struct)
> sourceRecord.value()).getStruct(Envelope.FieldName.AFTER);
>
> for (Field field: rowValue.schema().fields()){
> switch (field.schema().type()) {
> case BOOLEAN:
> // handling bit/boolean type
> Boolean value = rowValue.getBoolean(field.name());
> break;
> case BYTES:
> if (StringUtils.equals(field.schema().name(),
> Bits.class.getName())) {
> // handling bit(n) type, where n > 1
> String byteString = "";
> for (byte b : rowValue.getBytes(field.name())) {
> String tmpByteString = String.format("%8s",
> Integer.toBinaryString(b)).replace(' ', '0');
> if (tmpByteString.length() == 32) {
> tmpByteString = tmpByteString.substring(24);
> }
> byteString = tmpByteString + byteString;
> }
> } else // skipping irrelevant business logic ...
> break;
> case // skipping irrelevant business logic ...
> }
> // skipping irrelevant business logic ...
>
> }
> } {code}
> h1. Version
> I use the following maven settings.
> {code:java}
> <dependency>
> <groupId>com.ververica</groupId>
> <artifactId>flink-connector-postgres-cdc</artifactId>
> <version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
> </dependency> {code}
> PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error
> occurs regardless of PG Version.
> JDK: Java 8.
> h1. Offer for Assistance
> I am willing to provide additional test scenarios or results to help diagnose
> this issue further. Moreover, I am open to collaborating on reviewing
> potential fixes or providing any necessary feedback to ensure a comprehensive
> resolution to this discrepancy.
> Thank you for your attention to this issue, and I look forward to working
> together towards enhancing the reliability and accuracy of the Flink CDC
> PostgreSQL Connector.
> Best regards,
> AldrichZeng(曾曜)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)