[ 
https://issues.apache.org/jira/browse/FLINK-38647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18048489#comment-18048489
 ] 

Daishuyuan commented on FLINK-38647:
------------------------------------

I encountered the same issue and resolved it by setting {{REPLICA IDENTITY 
FULL}} on the source table after the pipeline transitioned to streaming mode.

It's frustrating that the framework throws a generic {{NullPointerException}} 
instead of a more descriptive exception. It would be much more user-friendly if 
it could provide a clear error message or actionable guidance to identify the 
missing database configuration.

> NullPointerException in Postgres CDC pipeline when update data in table
> -----------------------------------------------------------------------
>
>                 Key: FLINK-38647
>                 URL: https://issues.apache.org/jira/browse/FLINK-38647
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: Yang Guo
>            Priority: Critical
>
> I am building a CDC pipeline to capture data change from Postgres table to 
> Fluss. The CDC pipeline will fail when I update records in Postgres table. 
> Here is the error:
>  
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.lambda$inferStruct$1(DebeziumSchemaDataTypeInference.java:209)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> at 
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.inferStruct(DebeziumSchemaDataTypeInference.java:210)
> at 
> org.apache.flink.cdc.connectors.postgres.source.PostgresSchemaDataTypeInference.inferStruct(PostgresSchemaDataTypeInference.java:44)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:83)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:58)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractDataRecord(DebeziumEventDeserializationSchema.java:156)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractBeforeDataRecord(DebeziumEventDeserializationSchema.java:146)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserializeDataChangeRecord(DebeziumEventDeserializationSchema.java:126)
> at 
> org.apache.flink.cdc.debezium.event.SourceRecordEventDeserializer.deserialize(SourceRecordEventDeserializer.java:49)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:105)
> at 
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:160)
> at 
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:118)
> at 
> org.apache.flink.cdc.connectors.postgres.source.reader.PostgresPipelineRecordEmitter.processElement(PostgresPipelineRecordEmitter.java:114)
> at 
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:88)
> at 
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:57)
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
> at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:750) {code}
> The Postgres table DDL is:
>  
> {code:java}
> CREATE TABLE orders (
>     order_id INT PRIMARY KEY,
>     customer_name VARCHAR(100) NOT NULL,
>     order_date DATE NOT NULL DEFAULT CURRENT_DATE,
>     total_amount DECIMAL(10,2) NOT NULL
> ); {code}
> Data Operations:
> I can successfully get the data change records(+I, -D) for INSERT and DELETE 
> in Fluss. But when UPDATE, the CDC pipeline will fail with NPE listed above. 
> It looks failed `inferStruct`.
> {code:java}
> INSERT INTO orders (order_id, customer_name, order_date, total_amount) VALUES
> (1001, 'John Smith', '2024-01-15', 299.99),
> (1002, 'Emma Johnson', '2024-01-16', 150.50),
> (1003, 'Michael Brown', '2024-01-17', 89.99),
> (1004, 'Sarah Davis', '2024-01-18', 450.00),
> (1005, 'David Wilson', '2024-01-19', 199.99);
> DELETE FROM orders where order_id = '1004';
> // this will make the CDC pipeline fail with NPE
> UPDATE orders SET customer_name = 'James Taylor' WHERE order_id = 1005;{code}
>  
> *More Information:*
>  
> CDC Pipeline is Postgres to Fluss 
> Versions: Postgresql@14, Flink 1.20.1, Flink CDC 3.5.0, Fluss 0.7 
>  
> CDC configuration yaml file:
> {code:java}
> source:   
>   type: postgres   
>   name: Postgres Source   
>   hostname: 127.0.0.1   
>   port: 5432   
>   username: postgres   
>   password: postgres   
>   tables: postgres.public.orders   
>   decoding.plugin.name:  pgoutput   
>   slot.name: pgtest
> sink:  
>   type: fluss  
>   name: Fluss Sink  
>   bootstrap.servers: localhost:9123  
>   # Security-related properties for the Fluss client 
>   properties.client.security.protocol: PLAINTEXT 
> pipeline:   
>   name: Postgres to Fluss Pipeline   
>   parallelism: 1 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to