[ 
https://issues.apache.org/jira/browse/FLINK-35907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-35907:
-----------------------------------
    Labels: pull-request-available  (was: )

> Inconsistent handling of bit and bit(n) types in Flink CDC PostgreSQL 
> Connector
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-35907
>                 URL: https://issues.apache.org/jira/browse/FLINK-35907
>             Project: Flink
>          Issue Type: Bug
>         Environment: I use the following maven settings.
> {code:java}
> <dependency>
>     <groupId>com.ververica</groupId>
>     <artifactId>flink-connector-postgres-cdc</artifactId>
>     <version>3.0.1</version>
> </dependency> {code}
> PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error 
> occurs regardless of PG Version.
> JDK: Java 8.
>            Reporter: jiangcheng
>            Priority: Major
>              Labels: pull-request-available
>   Original Estimate: 168h
>  Remaining Estimate: 168h
>
> I am encountering issues with the Flink CDC PostgreSQL Connector's handling 
> of {{bit}} and {{bit(n, where n>1)}} data types, which seem to deviate from 
> expected behavior, leading to inconsistencies and errors during both snapshot 
> and incremental sync phases.
> h1. Problems
> h2. *Problem 1: Misinterpretation of {{bit}} and {{bit(1)}}*
> During data retrieval, {{bit}} and {{bit(1)}} types are interpreted as 
> {{{}org.apache.kafka.connect.data.Schema.BOOLEAN{}}}, mirroring the treatment 
> of PostgreSQL's native {{bool}} or {{boolean}} types. This may lead to loss 
> of precision when the actual intention could be to preserve the bit pattern, 
> rather than a simple true/false value.
> h2. *Problem 2: Error with {{{}bit{}}}{{{}(n, where n>1){}}} during Snapshot 
> Phase*
> For {{{}bit(n, where n>1){}}}, an error is encountered during the snapshot 
> phase within {{{}PostgresScanFetchTask{}}}. The issue arises from attempting 
> to read these values using {{{}PgResult#getBoolean{}}}, which is 
> inappropriate for {{bit(n, where n>1)}} types not representing a standard 
> boolean state. Strangely, this error does not surface during the incremental 
> phase, because no {{PgResult#getBoolean}} is invoked during the incremental 
> phase.
> h1. My Analysis
> h2. BIT type is interpreted to BOOLEAN
> Diving into the code reveals that for both scenarios, the connector relies on 
> {{PgResult#getObject}} which internally identifies {{bit}} and {{bit(n, where 
> n>1)}} as {{{}java.sql.Type.BOOLEAN{}}}. This misclassification triggers the 
> problematic usage of {{getBoolean}} for non-standard boolean representations 
> like {{{}bit(n, where n>1){}}}.
> h2.  
> h2. Inconsistency between Snapshot and Incremental Phases
> The discrepancy between the snapshot phase and incremental phase is 
> noteworthy. During the snapshot phase, errors manifest due to direct 
> interaction with {{PgResult#getObject}} in 
> {{{}PostgresScanFetchTask#createDataEventsForTable{}}}, and 
> {{PgResult#getObject}} is further forward to {{{}PgResult#getBoolean{}}}.
> Conversely, in the incremental phase, {{bit(n, where n>1)}} values are 
> coerced into {{{}org.apache.kafka.connect.data.Schema.BYTES{}}}, resulting in 
> a loss of the original {{n}} precision information. This forces consumers to 
> assume an 8-bit byte array representation, obscuring the intended bit-width 
> and potentially leading to incorrect interpretations (e.g., I insert a 
> {{bit(10)}} value into PostgreSQL Server named {{{}'0001111111'{}}}, which is 
> represented as a byte array of length = 1 in SourceRecord, the first element 
> is 127).
> h1. *My Opinion*
> From my perspective, the following approaches may solve this problem.
> *Consistent Handling:* The connector should uniformly and accurately handle 
> all {{bit}} variants, respecting their distinct PostgreSQL definitions.
> *Preserve Precision:* For {{{}bit(n, where n>1){}}}, ensure that the 
> precision {{n}} is maintained throughout processing, allowing consumers to 
> correctly interpret the intended bit sequence without assuming a default byte 
> size.
> *Schema Transparency:* Enhance metadata handling to communicate the original 
> {{bit(n, where n>1)}} schema accurately to downstream systems, enabling them 
> to process the data with full knowledge of its intended format.
> h1. Conclusion
> Addressing these discrepancies will not only improve the reliability of the 
> Flink CDC PostgreSQL Connector but also enhance its compatibility with a 
> broader range of use cases that rely on precise {{bit}} data handling. I look 
> forward to a resolution that ensures consistent and accurate processing 
> across both snapshot and incremental modes.
> Thank you for considering this issue, and I'm available to provide further 
> details or assist in any way possible.
> h1. Reproduct the Error
> I use the following code to read record from PostgreSQL Connector by 
> implementing the deserialize method in 
> {{{}DebeziumDeserializationSchema<Record>{}}}:
> {code:java}
> public class PostgreSqlRecordSourceDeserializeSchema
>         implements DebeziumDeserializationSchema<Record> {
>         
>     public void deserialize(SourceRecord sourceRecord, Collector<Record> out) 
> throws Exception {  
>         // skipping irrelevant business logic ...      
>         Struct rowValue = ((Struct) 
> sourceRecord.value()).getStruct(Envelope.FieldName.AFTER);
>         
>         for (Field field: rowValue.schema().fields()){
>             switch (field.schema().type()) {
>                 case BOOLEAN:
>                     // handling bit/boolean type
>                     Boolean value = rowValue.getBoolean(field.name());
>                     break;
>                 case BYTES:
>                     if (StringUtils.equals(field.schema().name(), 
> Bits.class.getName())) {
>                            // handling bit(n) type, where n > 1
>                            String byteString = "";
>                              for (byte b : rowValue.getBytes(field.name())) {
>                                  String tmpByteString = String.format("%8s", 
> Integer.toBinaryString(b)).replace(' ', '0');
>                             if (tmpByteString.length() == 32) {
>                                 tmpByteString = tmpByteString.substring(24);
>                             }
>                             byteString = tmpByteString + byteString;
>                              }
>                     } else // skipping irrelevant business logic ...     
>                     break;
>                 case // skipping irrelevant business logic ...     
>         }
>         // skipping irrelevant business logic ...     
>         
>     }
> }  {code}
> h1. Version
> I use the following maven settings.
> {code:java}
> <dependency>
>     <groupId>com.ververica</groupId>
>     <artifactId>flink-connector-postgres-cdc</artifactId>
>     <version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
> </dependency> {code}
> PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error 
> occurs regardless of PG Version.
> JDK: Java 8.
> h1. Offer for Assistance
> I am willing to provide additional test scenarios or results to help diagnose 
> this issue further. Moreover, I am open to collaborating on reviewing 
> potential fixes or providing any necessary feedback to ensure a comprehensive 
> resolution to this discrepancy. 
> Thank you for your attention to this issue, and I look forward to working 
> together towards enhancing the reliability and accuracy of the Flink CDC 
> PostgreSQL Connector.
> Best regards,
> AldrichZeng(曾曜)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to