[
https://issues.apache.org/jira/browse/FLINK-38647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18048489#comment-18048489
]
Daishuyuan commented on FLINK-38647:
------------------------------------
I encountered the same issue and resolved it by setting {{REPLICA IDENTITY
FULL}} on the source table after the pipeline transitioned to streaming mode.
It's frustrating that the framework throws a generic {{NullPointerException}}
instead of a more descriptive exception. It would be much more user-friendly if
it could provide a clear error message or actionable guidance to identify the
missing database configuration.
> NullPointerException in Postgres CDC pipeline when update data in table
> -----------------------------------------------------------------------
>
> Key: FLINK-38647
> URL: https://issues.apache.org/jira/browse/FLINK-38647
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: Yang Guo
> Priority: Critical
>
> I am building a CDC pipeline to capture data change from Postgres table to
> Fluss. The CDC pipeline will fail when I update records in Postgres table.
> Here is the error:
>
> {code:java}
> java.lang.NullPointerException
> at
> org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.lambda$inferStruct$1(DebeziumSchemaDataTypeInference.java:209)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> at
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> at
> org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.inferStruct(DebeziumSchemaDataTypeInference.java:210)
> at
> org.apache.flink.cdc.connectors.postgres.source.PostgresSchemaDataTypeInference.inferStruct(PostgresSchemaDataTypeInference.java:44)
> at
> org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:83)
> at
> org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:58)
> at
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractDataRecord(DebeziumEventDeserializationSchema.java:156)
> at
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractBeforeDataRecord(DebeziumEventDeserializationSchema.java:146)
> at
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserializeDataChangeRecord(DebeziumEventDeserializationSchema.java:126)
> at
> org.apache.flink.cdc.debezium.event.SourceRecordEventDeserializer.deserialize(SourceRecordEventDeserializer.java:49)
> at
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:105)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:160)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:118)
> at
> org.apache.flink.cdc.connectors.postgres.source.reader.PostgresPipelineRecordEmitter.processElement(PostgresPipelineRecordEmitter.java:114)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:88)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:57)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:750) {code}
> The Postgres table DDL is:
>
> {code:java}
> CREATE TABLE orders (
> order_id INT PRIMARY KEY,
> customer_name VARCHAR(100) NOT NULL,
> order_date DATE NOT NULL DEFAULT CURRENT_DATE,
> total_amount DECIMAL(10,2) NOT NULL
> ); {code}
> Data Operations:
> I can successfully get the data change records(+I, -D) for INSERT and DELETE
> in Fluss. But when UPDATE, the CDC pipeline will fail with NPE listed above.
> It looks failed `inferStruct`.
> {code:java}
> INSERT INTO orders (order_id, customer_name, order_date, total_amount) VALUES
> (1001, 'John Smith', '2024-01-15', 299.99),
> (1002, 'Emma Johnson', '2024-01-16', 150.50),
> (1003, 'Michael Brown', '2024-01-17', 89.99),
> (1004, 'Sarah Davis', '2024-01-18', 450.00),
> (1005, 'David Wilson', '2024-01-19', 199.99);
> DELETE FROM orders where order_id = '1004';
> // this will make the CDC pipeline fail with NPE
> UPDATE orders SET customer_name = 'James Taylor' WHERE order_id = 1005;{code}
>
> *More Information:*
>
> CDC Pipeline is Postgres to Fluss
> Versions: Postgresql@14, Flink 1.20.1, Flink CDC 3.5.0, Fluss 0.7
>
> CDC configuration yaml file:
> {code:java}
> source:
> type: postgres
> name: Postgres Source
> hostname: 127.0.0.1
> port: 5432
> username: postgres
> password: postgres
> tables: postgres.public.orders
> decoding.plugin.name: pgoutput
> slot.name: pgtest
> sink:
> type: fluss
> name: Fluss Sink
> bootstrap.servers: localhost:9123
> # Security-related properties for the Fluss client
> properties.client.security.protocol: PLAINTEXT
> pipeline:
> name: Postgres to Fluss Pipeline
> parallelism: 1 {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)