[ 
https://issues.apache.org/jira/browse/FLINK-38965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18053870#comment-18053870
 ] 

Tao Wang commented on FLINK-38965:
----------------------------------

as debug step by step, databaseMetadata.getColumns returns  columns of the two 
tables. 

i test by execute the logical sql of getColumns, *SQL returned the column 
information of two tables.* 

IMO, *In PostgreSQL SQL, the underscore {{_}} in a {{LIKE}} statement is used 
to match any single character.* 
{code:java}
// code placeholder
SELECT * FROM (SELECT n.nspname,c.relname,a.attname,a.atttypid,a.attnotnull OR 
(t.typtype = 'd' AND t.typnotnull) AS 
attnotnull,a.atttypmod,a.attlen,t.typtypmod,row_number() OVER (PARTITION BY 
a.attrelid ORDER BY a.attnum) AS attnum, nullif(a.attidentity, '') as 
attidentity,nullif(a.attgenerated, '') as 
attgenerated,pg_catalog.pg_get_expr(def.adbin, def.adrelid) AS 
adsrc,dsc.description,t.typbasetype,t.typtype  FROM pg_catalog.pg_namespace n  
JOIN pg_catalog.pg_class c ON (c.relnamespace = n.oid)  JOIN 
pg_catalog.pg_attribute a ON (a.attrelid=c.oid)  JOIN pg_catalog.pg_type t ON 
(a.atttypid = t.oid)  LEFT JOIN pg_catalog.pg_attrdef def ON 
(a.attrelid=def.adrelid AND a.attnum = def.adnum)  LEFT JOIN 
pg_catalog.pg_description dsc ON (c.oid=dsc.objoid AND a.attnum = dsc.objsubid) 
 LEFT JOIN pg_catalog.pg_class dc ON (dc.oid=dsc.classoid AND 
dc.relname='pg_class')  LEFT JOIN pg_catalog.pg_namespace dn ON 
(dc.relnamespace=dn.oid AND dn.nspname='pg_catalog')  WHERE c.relkind in 
('r','p','v','f','m') and a.attnum > 0 AND NOT a.attisdropped  AND n.nspname 
LIKE 'public' AND c.relname LIKE 'ndi_pg_user_sink_1') c WHERE true  ORDER BY 
nspname,c.relname,attnum  {code}
!image-2026-01-23-18-17-50-692.png|width=697,height=486!!image-2026-01-23-18-21-17-272.png|width=691,height=273!

> Postgres cdc source encounted  "java.lang.IllegalStateException: Duplicate 
> key Optional"
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-38965
>                 URL: https://issues.apache.org/jira/browse/FLINK-38965
>             Project: Flink
>          Issue Type: Bug
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: Tao Wang
>            Priority: Major
>         Attachments: image-2026-01-23-18-17-50-692.png, 
> image-2026-01-23-18-21-17-272.png
>
>
> connector: Postgress
> description:
> 1、we got two tables with similar name.  
> ndi_pg_user_sink_1
> ndi_pg_userbsink_1
> 2、they have same schema.
> 3、pg as cdc source, we encounted the exception
> {code:java}
> java.lang.RuntimeException: SplitFetcher thread 1 received unexpected 
> exception while polling the records     at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
>  ~[flink-connector-base-ne-flink-1.14.0-1.0.13.jar:ne-flink-1.14.0-1.0.13]  
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
>  ~[flink-connector-base-ne-flink-1.14.0-1.0.13.jar:ne-flink-1.14.0-1.0.13]    
>   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[?:1.8.0_421]       at 
> java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) 
> ~[?:1.8.0_421]    at java.util.concurrent.FutureTask.run(FutureTask.java) 
> ~[?:1.8.0_421]  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  ~[?:1.8.0_421]       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  ~[?:1.8.0_421]       at java.lang.Thread.run(Thread.java:750) 
> ~[?:1.8.0_421]Caused by: org.apache.kafka.connect.errors.ConnectException: An 
> exception occurred in the change event producer. This connector will be 
> stopped. at 
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:50) 
> ~[debezium-core-1.9.7.Final.jar:1.9.7.Final]    at 
> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:214)
>  ~[flink-connector-postgres-cdc-.jar:1.9.7.Final]    at 
> com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:211)
>  ~[flink-connector-postgres-cdc-.jar:]       at 
> com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:95)
>  ~[flink-connector-postgres-cdc-.jar:]    at 
> com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
>  ~[flink-cdc-base-.jar:]  ... 6 moreCaused by: 
> java.lang.IllegalStateException: Duplicate key Optional.empty      at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) 
> ~[?:1.8.0_421]      at java.util.HashMap.merge(HashMap.java:1255) 
> ~[?:1.8.0_421]    at 
> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) 
> ~[?:1.8.0_421]     at 
> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) 
> ~[?:1.8.0_421]   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) 
> ~[?:1.8.0_421]     at 
> java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175) 
> ~[?:1.8.0_421]     at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>  ~[?:1.8.0_421]        at 
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) 
> ~[?:1.8.0_421] at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) 
> ~[?:1.8.0_421]  at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) 
> ~[?:1.8.0_421]    at 
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) 
> ~[?:1.8.0_421] at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) 
> ~[?:1.8.0_421]        at 
> io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.handleRelationMessage(PgOutputMessageDecoder.java:319)
>  ~[flink-connector-postgres-cdc-.jar:]     at 
> io.debezium.connector.postgresql.connection.pgoutput.PgOutputMessageDecoder.processNotEmptyMessage(PgOutputMessageDecoder.java:178)
>  ~[flink-connector-postgres-cdc-.jar:]    at 
> io.debezium.connector.postgresql.connection.AbstractMessageDecoder.processMessage(AbstractMessageDecoder.java:33)
>  ~[debezium-connector-postgres-1.9.7.Final.jar:]    at 
> io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.deserializeMessages(PostgresReplicationConnection.java:604)
>  ~[flink-connector-postgres-cdc-.jar:]        at 
> io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:594)
>  ~[flink-connector-postgres-cdc-.jar:]        at 
> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:257)
>  ~[flink-connector-postgres-cdc-.jar:1.9.7.Final]    at 
> io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:212)
>  ~[flink-connector-postgres-cdc-.jar:1.9.7.Final]    at 
> com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask$StreamSplitReadTask.execute(PostgresStreamFetchTask.java:211)
>  ~[flink-connector-postgres-cdc-.jar:]       at 
> com.ververica.cdc.connectors.postgres.source.fetch.PostgresStreamFetchTask.execute(PostgresStreamFetchTask.java:95)
>  ~[flink-connector-postgres-cdc-.jar:]    at 
> com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89)
>  ~[flink-cdc-base-.jar:]  ... 6 more {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to