[
https://issues.apache.org/jira/browse/FLINK-35907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
jiangcheng updated FLINK-35907:
-------------------------------
Description:
I am encountering issues with the Flink CDC PostgreSQL Connector's handling of
{{bit}} and {{bit\(n)}} data types, which seem to deviate from expected
behavior, leading to inconsistencies and errors during both snapshot and
incremental sync phases.
h1. Problems
h2. *Problem 1: Misinterpretation of {{bit}} and {{bit(1)}}*
During data retrieval, {{bit}} and {{bit(1)}} types are interpreted as
{{{}org.apache.kafka.connect.data.Schema.BOOLEAN{}}}, mirroring the treatment
of PostgreSQL's native {{bool}} or {{boolean}} types. This may lead to loss of
precision when the actual intention could be to preserve the bit pattern,
rather than a simple true/false value.
h2. *Problem 2: Error with {{bit(n)}} during Snapshot Phase*
For {{{}bit(n) }}where \{{{}n > 1{}}}, an error is encountered during the
snapshot phase within {{{}PostgresScanFetchTask{}}}. The issue arises from
attempting to read these values using {{{}PgResult#getBoolean{}}}, which is
inappropriate for {{bit(n)}} types not representing a standard boolean state.
Strangely, this error does not surface during the incremental phase, because no
{{PgResult#getBoolean}} is invoked during the incremental phase.
h1. My Analysis
h2. BIT type is interpreted to BOOLEAN
Diving into the code reveals that for both scenarios, the connector relies on
{{PgResult#getObject}} which internally identifies {{bit}} and {{bit(n)}} as
{{{}java.sql.Type.BOOLEAN{}}}. This misclassification triggers the problematic
usage of {{getBoolean}} for non-standard boolean representations like
{{{}bit(n){}}}.
h2.
h2. Inconsistency between Snapshot and Incremental Phases
The discrepancy between the snapshot phase and incremental phase is noteworthy.
During the snapshot phase, errors manifest due to direct interaction with
{{PgResult#getObject}} in
{{{}PostgresScanFetchTask#createDataEventsForTable{}}}, and
{{PgResult#getObject}} is further forward to {{{}PgResult#getBoolean{}}}.
Conversely, in the incremental phase, {{bit(n)}} values are coerced into
{{{}org.apache.kafka.connect.data.Schema.BYTES{}}}, resulting in a loss of the
original {{n}} precision information. This forces consumers to assume an 8-bit
byte array representation, obscuring the intended bit-width and potentially
leading to incorrect interpretations (e.g., I insert a {{bit(10)}} value into
PostgreSQL Server named {{{}'0001111111'{}}}, which is represented as a byte
array of length = 1 in SourceRecord, the first element is 127).
h1. *My Opinion*
>From my perspective, the following approaches may solve this problem.
*Consistent Handling:* The connector should uniformly and accurately handle all
{{bit}} variants, respecting their distinct PostgreSQL definitions.
*Preserve Precision:* For {{{}bit(n){}}}, ensure that the precision {{n}} is
maintained throughout processing, allowing consumers to correctly interpret the
intended bit sequence without assuming a default byte size.
*Schema Transparency:* Enhance metadata handling to communicate the original
{{bit(n)}} schema accurately to downstream systems, enabling them to process
the data with full knowledge of its intended format.
h1. Conclusion
Addressing these discrepancies will not only improve the reliability of the
Flink CDC PostgreSQL Connector but also enhance its compatibility with a
broader range of use cases that rely on precise {{bit}} data handling. I look
forward to a resolution that ensures consistent and accurate processing across
both snapshot and incremental modes.
Thank you for considering this issue, and I'm available to provide further
details or assist in any way possible.
h1. Reproduct the Error
I use the following code to read record from PostgreSQL Connector by
implementing the deserialize method in
{{{}DebeziumDeserializationSchema<Record>{}}}:
{code:java}
public class PostgreSqlRecordSourceDeserializeSchema
implements DebeziumDeserializationSchema<Record> {
public void deserialize(SourceRecord sourceRecord, Collector<Record> out)
throws Exception {
// skipping irrelevant business logic ...
Struct rowValue = ((Struct)
sourceRecord.value()).getStruct(Envelope.FieldName.AFTER);
for (Field field: rowValue.schema().fields()){
switch (field.schema().type()) {
case BOOLEAN:
// handling bit/boolean type
Boolean value = rowValue.getBoolean(field.name());
break;
case BYTES:
if (StringUtils.equals(field.schema().name(),
Bits.class.getName())) {
// handling bit(n) type, where n > 1
String byteString = "";
for (byte b : rowValue.getBytes(field.name())) {
String tmpByteString = String.format("%8s",
Integer.toBinaryString(b)).replace(' ', '0');
if (tmpByteString.length() == 32) {
tmpByteString = tmpByteString.substring(24);
}
byteString = tmpByteString + byteString;
}
} else // skipping irrelevant business logic ...
break;
case // skipping irrelevant business logic ...
}
// skipping irrelevant business logic ...
}
} {code}
h1. Version
I use the following maven settings.
{code:java}
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
</dependency> {code}
PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error occurs
regardless of PG Version.
JDK: Java 8.
h1. Offer for Assistance
I am willing to provide additional test scenarios or results to help diagnose
this issue further. Moreover, I am open to collaborating on reviewing potential
fixes or providing any necessary feedback to ensure a comprehensive resolution
to this discrepancy.
Thank you for your attention to this issue, and I look forward to working
together towards enhancing the reliability and accuracy of the Flink CDC
PostgreSQL Connector.
Best regards,
AldrichZeng(曾曜)
was:
I am encountering issues with the Flink CDC PostgreSQL Connector's handling of
{{bit}} and {{bit(\n)}} data types, which seem to deviate from expected
behavior, leading to inconsistencies and errors during both snapshot and
incremental sync phases.
h1. Problems
h2. *Problem 1: Misinterpretation of {{bit}} and {{bit(1)}}*
During data retrieval, {{bit}} and {{bit(1)}} types are interpreted as
{{{}org.apache.kafka.connect.data.Schema.BOOLEAN{}}}, mirroring the treatment
of PostgreSQL's native {{bool}} or {{boolean}} types. This may lead to loss of
precision when the actual intention could be to preserve the bit pattern,
rather than a simple true/false value.
h2. *Problem 2: Error with {{bit(n)}} during Snapshot Phase*
For {{{}bit(n) }}where \{{{}n > 1{}}}, an error is encountered during the
snapshot phase within {{{}PostgresScanFetchTask{}}}. The issue arises from
attempting to read these values using {{{}PgResult#getBoolean{}}}, which is
inappropriate for {{bit(n)}} types not representing a standard boolean state.
Strangely, this error does not surface during the incremental phase, because no
{{PgResult#getBoolean}} is invoked during the incremental phase.
h1. My Analysis
h2. BIT type is interpreted to BOOLEAN
Diving into the code reveals that for both scenarios, the connector relies on
{{PgResult#getObject}} which internally identifies {{bit}} and {{bit(n)}} as
{{{}java.sql.Type.BOOLEAN{}}}. This misclassification triggers the problematic
usage of {{getBoolean}} for non-standard boolean representations like
{{{}bit(n){}}}.
h2.
h2. Inconsistency between Snapshot and Incremental Phases
The discrepancy between the snapshot phase and incremental phase is noteworthy.
During the snapshot phase, errors manifest due to direct interaction with
{{PgResult#getObject}} in
{{{}PostgresScanFetchTask#createDataEventsForTable{}}}, and
{{PgResult#getObject}} is further forward to {{{}PgResult#getBoolean{}}}.
Conversely, in the incremental phase, {{bit(n)}} values are coerced into
{{{}org.apache.kafka.connect.data.Schema.BYTES{}}}, resulting in a loss of the
original {{n}} precision information. This forces consumers to assume an 8-bit
byte array representation, obscuring the intended bit-width and potentially
leading to incorrect interpretations (e.g., I insert a {{bit(10)}} value into
PostgreSQL Server named {{{}'0001111111'{}}}, which is represented as a byte
array of length = 1 in SourceRecord, the first element is 127).
h1. *My Opinion*
>From my perspective, the following approaches may solve this problem.
*Consistent Handling:* The connector should uniformly and accurately handle all
{{bit}} variants, respecting their distinct PostgreSQL definitions.
*Preserve Precision:* For {{{}bit(n){}}}, ensure that the precision {{n}} is
maintained throughout processing, allowing consumers to correctly interpret the
intended bit sequence without assuming a default byte size.
*Schema Transparency:* Enhance metadata handling to communicate the original
{{bit(n)}} schema accurately to downstream systems, enabling them to process
the data with full knowledge of its intended format.
h1. Conclusion
Addressing these discrepancies will not only improve the reliability of the
Flink CDC PostgreSQL Connector but also enhance its compatibility with a
broader range of use cases that rely on precise {{bit}} data handling. I look
forward to a resolution that ensures consistent and accurate processing across
both snapshot and incremental modes.
Thank you for considering this issue, and I'm available to provide further
details or assist in any way possible.
h1. Reproduct the Error
I use the following code to read record from PostgreSQL Connector by
implementing the deserialize method in
{{{}DebeziumDeserializationSchema<Record>{}}}:
{code:java}
public class PostgreSqlRecordSourceDeserializeSchema
implements DebeziumDeserializationSchema<Record> {
public void deserialize(SourceRecord sourceRecord, Collector<Record> out)
throws Exception {
// skipping irrelevant business logic ...
Struct rowValue = ((Struct)
sourceRecord.value()).getStruct(Envelope.FieldName.AFTER);
for (Field field: rowValue.schema().fields()){
switch (field.schema().type()) {
case BOOLEAN:
// handling bit/boolean type
Boolean value = rowValue.getBoolean(field.name());
break;
case BYTES:
if (StringUtils.equals(field.schema().name(),
Bits.class.getName())) {
// handling bit(n) type, where n > 1
String byteString = "";
for (byte b : rowValue.getBytes(field.name())) {
String tmpByteString = String.format("%8s",
Integer.toBinaryString(b)).replace(' ', '0');
if (tmpByteString.length() == 32) {
tmpByteString = tmpByteString.substring(24);
}
byteString = tmpByteString + byteString;
}
} else // skipping irrelevant business logic ...
break;
case // skipping irrelevant business logic ...
}
// skipping irrelevant business logic ...
}
} {code}
h1. Version
I use the following maven settings.
{code:java}
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
<version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
</dependency> {code}
PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error occurs
regardless of PG Version.
JDK: Java 8.
h1. Offer for Assistance
I am willing to provide additional test scenarios or results to help diagnose
this issue further. Moreover, I am open to collaborating on reviewing potential
fixes or providing any necessary feedback to ensure a comprehensive resolution
to this discrepancy.
Thank you for your attention to this issue, and I look forward to working
together towards enhancing the reliability and accuracy of the Flink CDC
PostgreSQL Connector.
Best regards,
AldrichZeng(曾曜)
> Inconsistent handling of bit and bit(n) types in Flink CDC PostgreSQL
> Connector
> -------------------------------------------------------------------------------
>
> Key: FLINK-35907
> URL: https://issues.apache.org/jira/browse/FLINK-35907
> Project: Flink
> Issue Type: Bug
> Environment: I use the following maven settings.
> {code:java}
> <dependency>
> <groupId>com.ververica</groupId>
> <artifactId>flink-connector-postgres-cdc</artifactId>
> <version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
> </dependency> {code}
> PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error
> occurs regardless of PG Version.
> JDK: Java 8.
> Reporter: jiangcheng
> Priority: Major
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> I am encountering issues with the Flink CDC PostgreSQL Connector's handling
> of {{bit}} and {{bit\(n)}} data types, which seem to deviate from expected
> behavior, leading to inconsistencies and errors during both snapshot and
> incremental sync phases.
> h1. Problems
> h2. *Problem 1: Misinterpretation of {{bit}} and {{bit(1)}}*
> During data retrieval, {{bit}} and {{bit(1)}} types are interpreted as
> {{{}org.apache.kafka.connect.data.Schema.BOOLEAN{}}}, mirroring the treatment
> of PostgreSQL's native {{bool}} or {{boolean}} types. This may lead to loss
> of precision when the actual intention could be to preserve the bit pattern,
> rather than a simple true/false value.
> h2. *Problem 2: Error with {{bit(n)}} during Snapshot Phase*
> For {{{}bit(n) }}where \{{{}n > 1{}}}, an error is encountered during the
> snapshot phase within {{{}PostgresScanFetchTask{}}}. The issue arises from
> attempting to read these values using {{{}PgResult#getBoolean{}}}, which is
> inappropriate for {{bit(n)}} types not representing a standard boolean state.
> Strangely, this error does not surface during the incremental phase, because
> no {{PgResult#getBoolean}} is invoked during the incremental phase.
> h1. My Analysis
> h2. BIT type is interpreted to BOOLEAN
> Diving into the code reveals that for both scenarios, the connector relies on
> {{PgResult#getObject}} which internally identifies {{bit}} and {{bit(n)}} as
> {{{}java.sql.Type.BOOLEAN{}}}. This misclassification triggers the
> problematic usage of {{getBoolean}} for non-standard boolean representations
> like {{{}bit(n){}}}.
> h2.
> h2. Inconsistency between Snapshot and Incremental Phases
> The discrepancy between the snapshot phase and incremental phase is
> noteworthy. During the snapshot phase, errors manifest due to direct
> interaction with {{PgResult#getObject}} in
> {{{}PostgresScanFetchTask#createDataEventsForTable{}}}, and
> {{PgResult#getObject}} is further forward to {{{}PgResult#getBoolean{}}}.
> Conversely, in the incremental phase, {{bit(n)}} values are coerced into
> {{{}org.apache.kafka.connect.data.Schema.BYTES{}}}, resulting in a loss of
> the original {{n}} precision information. This forces consumers to assume an
> 8-bit byte array representation, obscuring the intended bit-width and
> potentially leading to incorrect interpretations (e.g., I insert a
> {{bit(10)}} value into PostgreSQL Server named {{{}'0001111111'{}}}, which is
> represented as a byte array of length = 1 in SourceRecord, the first element
> is 127).
> h1. *My Opinion*
> From my perspective, the following approaches may solve this problem.
> *Consistent Handling:* The connector should uniformly and accurately handle
> all {{bit}} variants, respecting their distinct PostgreSQL definitions.
> *Preserve Precision:* For {{{}bit(n){}}}, ensure that the precision {{n}} is
> maintained throughout processing, allowing consumers to correctly interpret
> the intended bit sequence without assuming a default byte size.
> *Schema Transparency:* Enhance metadata handling to communicate the original
> {{bit(n)}} schema accurately to downstream systems, enabling them to process
> the data with full knowledge of its intended format.
> h1. Conclusion
> Addressing these discrepancies will not only improve the reliability of the
> Flink CDC PostgreSQL Connector but also enhance its compatibility with a
> broader range of use cases that rely on precise {{bit}} data handling. I look
> forward to a resolution that ensures consistent and accurate processing
> across both snapshot and incremental modes.
> Thank you for considering this issue, and I'm available to provide further
> details or assist in any way possible.
> h1. Reproduct the Error
> I use the following code to read record from PostgreSQL Connector by
> implementing the deserialize method in
> {{{}DebeziumDeserializationSchema<Record>{}}}:
> {code:java}
> public class PostgreSqlRecordSourceDeserializeSchema
> implements DebeziumDeserializationSchema<Record> {
>
> public void deserialize(SourceRecord sourceRecord, Collector<Record> out)
> throws Exception {
> // skipping irrelevant business logic ...
> Struct rowValue = ((Struct)
> sourceRecord.value()).getStruct(Envelope.FieldName.AFTER);
>
> for (Field field: rowValue.schema().fields()){
> switch (field.schema().type()) {
> case BOOLEAN:
> // handling bit/boolean type
> Boolean value = rowValue.getBoolean(field.name());
> break;
> case BYTES:
> if (StringUtils.equals(field.schema().name(),
> Bits.class.getName())) {
> // handling bit(n) type, where n > 1
> String byteString = "";
> for (byte b : rowValue.getBytes(field.name())) {
> String tmpByteString = String.format("%8s",
> Integer.toBinaryString(b)).replace(' ', '0');
> if (tmpByteString.length() == 32) {
> tmpByteString = tmpByteString.substring(24);
> }
> byteString = tmpByteString + byteString;
> }
> } else // skipping irrelevant business logic ...
> break;
> case // skipping irrelevant business logic ...
> }
> // skipping irrelevant business logic ...
>
> }
> } {code}
> h1. Version
> I use the following maven settings.
> {code:java}
> <dependency>
> <groupId>com.ververica</groupId>
> <artifactId>flink-connector-postgres-cdc</artifactId>
> <version>2.4-vvr-8.0-metrics-SNAPSHOT</version>
> </dependency> {code}
> PostgreSQL Server Version:I have tested on 14.0, 15.0, 16.0. This error
> occurs regardless of PG Version.
> JDK: Java 8.
> h1. Offer for Assistance
> I am willing to provide additional test scenarios or results to help diagnose
> this issue further. Moreover, I am open to collaborating on reviewing
> potential fixes or providing any necessary feedback to ensure a comprehensive
> resolution to this discrepancy.
> Thank you for your attention to this issue, and I look forward to working
> together towards enhancing the reliability and accuracy of the Flink CDC
> PostgreSQL Connector.
> Best regards,
> AldrichZeng(曾曜)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)