[ 
https://issues.apache.org/jira/browse/FLINK-38647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yang Guo updated FLINK-38647:
-----------------------------
    Description: 
I am building a CDC pipeline to capture data change from Postgres table to 
Fluss. The CDC pipeline will fail when I update records in Postgres table. Here 
is the error:

 
{code:java}
java.lang.NullPointerException
at 
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.lambda$inferStruct$1(DebeziumSchemaDataTypeInference.java:209)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
at 
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.inferStruct(DebeziumSchemaDataTypeInference.java:210)
at 
org.apache.flink.cdc.connectors.postgres.source.PostgresSchemaDataTypeInference.inferStruct(PostgresSchemaDataTypeInference.java:44)
at 
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:83)
at 
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:58)
at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractDataRecord(DebeziumEventDeserializationSchema.java:156)
at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractBeforeDataRecord(DebeziumEventDeserializationSchema.java:146)
at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserializeDataChangeRecord(DebeziumEventDeserializationSchema.java:126)
at 
org.apache.flink.cdc.debezium.event.SourceRecordEventDeserializer.deserialize(SourceRecordEventDeserializer.java:49)
at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:105)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:160)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:118)
at 
org.apache.flink.cdc.connectors.postgres.source.reader.PostgresPipelineRecordEmitter.processElement(PostgresPipelineRecordEmitter.java:114)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:88)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:57)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750) {code}
The Postgres table DDL is:

 
{code:java}
CREATE TABLE orders (
    order_id INT PRIMARY KEY,
    customer_name VARCHAR(100) NOT NULL,
    order_date DATE NOT NULL DEFAULT CURRENT_DATE,
    total_amount DECIMAL(10,2) NOT NULL
); {code}
Data Operations:

I can successfully get the data change records(+I, -D) for INSERT and DELETE in 
Fluss. But when UPDATE, the CDC pipeline will fail with NPE listed above. It 
looks failed `inferStruct`.
{code:java}
INSERT INTO orders (order_id, customer_name, order_date, total_amount) VALUES
(1001, 'John Smith', '2024-01-15', 299.99),
(1002, 'Emma Johnson', '2024-01-16', 150.50),
(1003, 'Michael Brown', '2024-01-17', 89.99),
(1004, 'Sarah Davis', '2024-01-18', 450.00),
(1005, 'David Wilson', '2024-01-19', 199.99);

DELETE FROM orders where order_id = '1004';

// this will make the CDC pipeline fail with NPE
UPDATE orders SET customer_name = 'James Taylor' WHERE order_id = 1005;{code}
 

*More Information:*

 

CDC Pipeline is Postgres to Fluss 

Versions: Postgresql@14, Flink 1.20.1, Flink CDC 3.5.0, Fluss 0.7 
 
CDC configuration yaml file:
{code:java}
source:   
  type: postgres   
  name: Postgres Source   
  hostname: 127.0.0.1   
  port: 5432   
  username: postgres   
  password: postgres   
  tables: postgres.public.orders   
  decoding.plugin.name:  pgoutput   
  slot.name: pgtest

sink:  
  type: fluss  
  name: Fluss Sink  
  bootstrap.servers: localhost:9123  
  # Security-related properties for the Fluss client 
  properties.client.security.protocol: PLAINTEXT 

pipeline:   
  name: Postgres to Fluss Pipeline   
  parallelism: 1 {code}

  was:
I am building a CDC pipeline to capture data change from Postgres table to 
Fluss. The CDC pipeline will fail when I update records in Postgres table. Here 
is the error:

```
java.lang.NullPointerException
at 
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.lambda$inferStruct$1(DebeziumSchemaDataTypeInference.java:209)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
at 
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
at 
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.inferStruct(DebeziumSchemaDataTypeInference.java:210)
at 
org.apache.flink.cdc.connectors.postgres.source.PostgresSchemaDataTypeInference.inferStruct(PostgresSchemaDataTypeInference.java:44)
at 
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:83)
at 
org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:58)
at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractDataRecord(DebeziumEventDeserializationSchema.java:156)
at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractBeforeDataRecord(DebeziumEventDeserializationSchema.java:146)
at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserializeDataChangeRecord(DebeziumEventDeserializationSchema.java:126)
at 
org.apache.flink.cdc.debezium.event.SourceRecordEventDeserializer.deserialize(SourceRecordEventDeserializer.java:49)
at 
org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:105)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:160)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:118)
at 
org.apache.flink.cdc.connectors.postgres.source.reader.PostgresPipelineRecordEmitter.processElement(PostgresPipelineRecordEmitter.java:114)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:88)
at 
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:57)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
```

The Postgres table DDL is:


 
{code:java}
CREATE TABLE orders (
    order_id INT PRIMARY KEY,
    customer_name VARCHAR(100) NOT NULL,
    order_date DATE NOT NULL DEFAULT CURRENT_DATE,
    total_amount DECIMAL(10,2) NOT NULL
); {code}
Data Operations:

I can successfully get the data change records(+I, -D) for INSERT and DELETE in 
Fluss. But when UPDATE, the CDC pipeline will fail with NPE listed above. It 
looks failed `inferStruct`.
{code:java}

INSERT INTO orders (order_id, customer_name, order_date, total_amount) VALUES
(1001, 'John Smith', '2024-01-15', 299.99),
(1002, 'Emma Johnson', '2024-01-16', 150.50),
(1003, 'Michael Brown', '2024-01-17', 89.99),
(1004, 'Sarah Davis', '2024-01-18', 450.00),
(1005, 'David Wilson', '2024-01-19', 199.99);

DELETE FROM orders where order_id = '1004';

// this will make the CDC pipeline fail with NPE
UPDATE orders SET customer_name = 'James Taylor' WHERE order_id = 1005;{code}
 



*More Information:*

 

CDC Pipeline is Postgres to Fluss 

Versions: Postgresql@14, Flink 1.20.1, Flink CDC 3.5.0, Fluss 0.7 
 
CDC configuration yaml file:
{code:java}
source:   
  type: postgres   
  name: Postgres Source   
  hostname: 127.0.0.1   
  port: 5432   
  username: postgres   
  password: postgres   
  tables: postgres.public.orders   
  decoding.plugin.name:  pgoutput   
  slot.name: pgtest

sink:  
  type: fluss  
  name: Fluss Sink  
  bootstrap.servers: localhost:9123  
  # Security-related properties for the Fluss client 
  properties.client.security.protocol: PLAINTEXT 

pipeline:   
  name: Postgres to Fluss Pipeline   
  parallelism: 1 {code}


> NullPointerException in Postgres CDC pipeline when update data in table
> -----------------------------------------------------------------------
>
>                 Key: FLINK-38647
>                 URL: https://issues.apache.org/jira/browse/FLINK-38647
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: Yang Guo
>            Priority: Critical
>
> I am building a CDC pipeline to capture data change from Postgres table to 
> Fluss. The CDC pipeline will fail when I update records in Postgres table. 
> Here is the error:
>  
> {code:java}
> java.lang.NullPointerException
> at 
> org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.lambda$inferStruct$1(DebeziumSchemaDataTypeInference.java:209)
> at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
> at 
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.inferStruct(DebeziumSchemaDataTypeInference.java:210)
> at 
> org.apache.flink.cdc.connectors.postgres.source.PostgresSchemaDataTypeInference.inferStruct(PostgresSchemaDataTypeInference.java:44)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:83)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumSchemaDataTypeInference.infer(DebeziumSchemaDataTypeInference.java:58)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractDataRecord(DebeziumEventDeserializationSchema.java:156)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.extractBeforeDataRecord(DebeziumEventDeserializationSchema.java:146)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserializeDataChangeRecord(DebeziumEventDeserializationSchema.java:126)
> at 
> org.apache.flink.cdc.debezium.event.SourceRecordEventDeserializer.deserialize(SourceRecordEventDeserializer.java:49)
> at 
> org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:105)
> at 
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitElement(IncrementalSourceRecordEmitter.java:160)
> at 
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.processElement(IncrementalSourceRecordEmitter.java:118)
> at 
> org.apache.flink.cdc.connectors.postgres.source.reader.PostgresPipelineRecordEmitter.processElement(PostgresPipelineRecordEmitter.java:114)
> at 
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:88)
> at 
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter.emitRecord(IncrementalSourceRecordEmitter.java:57)
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
> at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:750) {code}
> The Postgres table DDL is:
>  
> {code:java}
> CREATE TABLE orders (
>     order_id INT PRIMARY KEY,
>     customer_name VARCHAR(100) NOT NULL,
>     order_date DATE NOT NULL DEFAULT CURRENT_DATE,
>     total_amount DECIMAL(10,2) NOT NULL
> ); {code}
> Data Operations:
> I can successfully get the data change records(+I, -D) for INSERT and DELETE 
> in Fluss. But when UPDATE, the CDC pipeline will fail with NPE listed above. 
> It looks failed `inferStruct`.
> {code:java}
> INSERT INTO orders (order_id, customer_name, order_date, total_amount) VALUES
> (1001, 'John Smith', '2024-01-15', 299.99),
> (1002, 'Emma Johnson', '2024-01-16', 150.50),
> (1003, 'Michael Brown', '2024-01-17', 89.99),
> (1004, 'Sarah Davis', '2024-01-18', 450.00),
> (1005, 'David Wilson', '2024-01-19', 199.99);
> DELETE FROM orders where order_id = '1004';
> // this will make the CDC pipeline fail with NPE
> UPDATE orders SET customer_name = 'James Taylor' WHERE order_id = 1005;{code}
>  
> *More Information:*
>  
> CDC Pipeline is Postgres to Fluss 
> Versions: Postgresql@14, Flink 1.20.1, Flink CDC 3.5.0, Fluss 0.7 
>  
> CDC configuration yaml file:
> {code:java}
> source:   
>   type: postgres   
>   name: Postgres Source   
>   hostname: 127.0.0.1   
>   port: 5432   
>   username: postgres   
>   password: postgres   
>   tables: postgres.public.orders   
>   decoding.plugin.name:  pgoutput   
>   slot.name: pgtest
> sink:  
>   type: fluss  
>   name: Fluss Sink  
>   bootstrap.servers: localhost:9123  
>   # Security-related properties for the Fluss client 
>   properties.client.security.protocol: PLAINTEXT 
> pipeline:   
>   name: Postgres to Fluss Pipeline   
>   parallelism: 1 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to