[ https://issues.apache.org/jira/browse/FLINK-35907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-35907: ----------------------------------- Labels: pull-request-available (was: ) > Inconsistent handling of bit and bit(n) types in Flink CDC PostgreSQL > Connector > ------------------------------------------------------------------------------- > > Key: FLINK-35907 > URL: https://issues.apache.org/jira/browse/FLINK-35907 > Project: Flink > Issue Type: Bug > Environment: I use the following maven settings. > {code:java} > <dependency> > <groupId>com.ververica</groupId> > <artifactId>flink-connector-postgres-cdc</artifactId> > <version>3.0.1</version> > </dependency> {code} > PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error > occurs regardless of PG Version. > JDK: Java 8. > Reporter: jiangcheng > Priority: Major > Labels: pull-request-available > Original Estimate: 168h > Remaining Estimate: 168h > > I am encountering issues with the Flink CDC PostgreSQL Connector's handling > of {{bit}} and {{bit(n, where n>1)}} data types, which seem to deviate from > expected behavior, leading to inconsistencies and errors during both snapshot > and incremental sync phases. > h1. Problems > h2. *Problem 1: Misinterpretation of {{bit}} and {{bit(1)}}* > During data retrieval, {{bit}} and {{bit(1)}} types are interpreted as > {{{}org.apache.kafka.connect.data.Schema.BOOLEAN{}}}, mirroring the treatment > of PostgreSQL's native {{bool}} or {{boolean}} types. This may lead to loss > of precision when the actual intention could be to preserve the bit pattern, > rather than a simple true/false value. > h2. *Problem 2: Error with {{{}bit{}}}{{{}(n, where n>1){}}} during Snapshot > Phase* > For {{{}bit(n, where n>1){}}}, an error is encountered during the snapshot > phase within {{{}PostgresScanFetchTask{}}}. The issue arises from attempting > to read these values using {{{}PgResult#getBoolean{}}}, which is > inappropriate for {{bit(n, where n>1)}} types not representing a standard > boolean state. Strangely, this error does not surface during the incremental > phase, because no {{PgResult#getBoolean}} is invoked during the incremental > phase. > h1. My Analysis > h2. BIT type is interpreted to BOOLEAN > Diving into the code reveals that for both scenarios, the connector relies on > {{PgResult#getObject}} which internally identifies {{bit}} and {{bit(n, where > n>1)}} as {{{}java.sql.Type.BOOLEAN{}}}. This misclassification triggers the > problematic usage of {{getBoolean}} for non-standard boolean representations > like {{{}bit(n, where n>1){}}}. > h2. > h2. Inconsistency between Snapshot and Incremental Phases > The discrepancy between the snapshot phase and incremental phase is > noteworthy. During the snapshot phase, errors manifest due to direct > interaction with {{PgResult#getObject}} in > {{{}PostgresScanFetchTask#createDataEventsForTable{}}}, and > {{PgResult#getObject}} is further forward to {{{}PgResult#getBoolean{}}}. > Conversely, in the incremental phase, {{bit(n, where n>1)}} values are > coerced into {{{}org.apache.kafka.connect.data.Schema.BYTES{}}}, resulting in > a loss of the original {{n}} precision information. This forces consumers to > assume an 8-bit byte array representation, obscuring the intended bit-width > and potentially leading to incorrect interpretations (e.g., I insert a > {{bit(10)}} value into PostgreSQL Server named {{{}'0001111111'{}}}, which is > represented as a byte array of length = 1 in SourceRecord, the first element > is 127). > h1. *My Opinion* > From my perspective, the following approaches may solve this problem. > *Consistent Handling:* The connector should uniformly and accurately handle > all {{bit}} variants, respecting their distinct PostgreSQL definitions. > *Preserve Precision:* For {{{}bit(n, where n>1){}}}, ensure that the > precision {{n}} is maintained throughout processing, allowing consumers to > correctly interpret the intended bit sequence without assuming a default byte > size. > *Schema Transparency:* Enhance metadata handling to communicate the original > {{bit(n, where n>1)}} schema accurately to downstream systems, enabling them > to process the data with full knowledge of its intended format. > h1. Conclusion > Addressing these discrepancies will not only improve the reliability of the > Flink CDC PostgreSQL Connector but also enhance its compatibility with a > broader range of use cases that rely on precise {{bit}} data handling. I look > forward to a resolution that ensures consistent and accurate processing > across both snapshot and incremental modes. > Thank you for considering this issue, and I'm available to provide further > details or assist in any way possible. > h1. Reproduct the Error > I use the following code to read record from PostgreSQL Connector by > implementing the deserialize method in > {{{}DebeziumDeserializationSchema<Record>{}}}: > {code:java} > public class PostgreSqlRecordSourceDeserializeSchema > implements DebeziumDeserializationSchema<Record> { > > public void deserialize(SourceRecord sourceRecord, Collector<Record> out) > throws Exception { > // skipping irrelevant business logic ... > Struct rowValue = ((Struct) > sourceRecord.value()).getStruct(Envelope.FieldName.AFTER); > > for (Field field: rowValue.schema().fields()){ > switch (field.schema().type()) { > case BOOLEAN: > // handling bit/boolean type > Boolean value = rowValue.getBoolean(field.name()); > break; > case BYTES: > if (StringUtils.equals(field.schema().name(), > Bits.class.getName())) { > // handling bit(n) type, where n > 1 > String byteString = ""; > for (byte b : rowValue.getBytes(field.name())) { > String tmpByteString = String.format("%8s", > Integer.toBinaryString(b)).replace(' ', '0'); > if (tmpByteString.length() == 32) { > tmpByteString = tmpByteString.substring(24); > } > byteString = tmpByteString + byteString; > } > } else // skipping irrelevant business logic ... > break; > case // skipping irrelevant business logic ... > } > // skipping irrelevant business logic ... > > } > } {code} > h1. Version > I use the following maven settings. > {code:java} > <dependency> > <groupId>com.ververica</groupId> > <artifactId>flink-connector-postgres-cdc</artifactId> > <version>2.4-vvr-8.0-metrics-SNAPSHOT</version> > </dependency> {code} > PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error > occurs regardless of PG Version. > JDK: Java 8. > h1. Offer for Assistance > I am willing to provide additional test scenarios or results to help diagnose > this issue further. Moreover, I am open to collaborating on reviewing > potential fixes or providing any necessary feedback to ensure a comprehensive > resolution to this discrepancy. > Thank you for your attention to this issue, and I look forward to working > together towards enhancing the reliability and accuracy of the Flink CDC > PostgreSQL Connector. > Best regards, > AldrichZeng(曾曜) -- This message was sent by Atlassian Jira (v8.20.10#820010)