[
https://issues.apache.org/jira/browse/FLINK-38688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18039080#comment-18039080
]
martin1986 commented on FLINK-38688:
------------------------------------
[~tejanshrana] I am using PostgreSQL 16.1 on x86_64-pc-linux-gnu, compiled by
gcc (Debian 10.2.1-6) 10.2.1 20210110, 64-bit
> JdbcIncrementalSource with StartupOptions.initial() fails, while
> StartupOptions.latest() and legacy PostgreSQLSource work (Flink CDC 3.5)
> -----------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-38688
> URL: https://issues.apache.org/jira/browse/FLINK-38688
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: martin1986
> Priority: Major
>
> {{JdbcIncrementalSource}} with {{StartupOptions.initial()}} fails, while
> {{StartupOptions.latest()}} and legacy PostgreSQLSource work (Flink CDC 3.5)
> ----
> *Description*
> I’m using *Flink CDC 3.5* with PostgreSQL and ran into an issue when
> migrating from the legacy {{PostgreSQLSource}} to the new
> {{JdbcIncrementalSource}} ({{{}PostgresIncrementalSource{}}}).
> When I configure the source with *{{StartupOptions.initial()}}* (expecting a
> _full snapshot + incremental_ pipeline), the job fails at runtime with an
> exception.
> However, if I change the configuration to {*}{{StartupOptions.latest()}}{*},
> the job starts and runs normally (only consuming binlog changes).
> With the {*}same database/table/slot{*}, the legacy {{PostgreSQLSource}} can
> successfully read the {*}full snapshot{*}, so it does not look like a
> table/permission issue but rather something specific to the new
> {{JdbcIncrementalSource}} in 3.5.
> ----
> *Code Snippets*
> h3. 1. JdbcIncrementalSource (failing with {{{}initial(){}}})
>
>
> {{JdbcIncrementalSource<String> postgresSource =
> PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
> .hostname(cfg.getFundCdc().getHostname())
> .port(cfg.getFundCdc().getPort())
> .database(cfg.getFundCdc().getDatabase())
> .schemaList(cfg.getFundCdc().getSchema())
> .tableList(cfg.getFundCdc().getTableList().toArray(new String[0]))
> .username(cfg.getFundCdc().getUsername())
> .password(cfg.getFundCdc().getPassword())
> .decodingPluginName("pgoutput")
> .slotName(cfg.getFundCdc().getSlotName()) // This causes the
> job to fail .startupOptions(StartupOptions.initial())
> .debeziumProperties(props)
> .deserializer(new JsonDebeziumDeserializationSchema())
> .build();}} * With {{.startupOptions(StartupOptions.initial())}} →
> the job fails with an exception.
> * If I change this line to {{.startupOptions(StartupOptions.latest())}} →
> the job runs successfully.
> ----
> h3. 2. Legacy PostgreSQLSource (works fine with full snapshot)
> Using the legacy {{PostgreSQLSource}} with the same configuration is able to
> perform a full snapshot without any error:
>
>
> {{SourceFunction<String> postgresSource = PostgreSQLSource.<String>builder()
> .hostname(cfg.getFundCdc().getHostname())
> .port(cfg.getFundCdc().getPort())
> .database(cfg.getFundCdc().getDatabase())
> .schemaList(cfg.getFundCdc().getSchema()) // dot needs to be
> escaped .tableList("nova-fund\\.player_transaction")
> .username(cfg.getFundCdc().getUsername())
> .password(cfg.getFundCdc().getPassword())
> .decodingPluginName("pgoutput")
> .slotName(cfg.getFundCdc().getSlotName())
> .deserializer(new JsonDebeziumDeserializationSchema())
> .build();}}
> This source correctly reads all *snapshot data* from
> {{{}nova-fund.player_transaction{}}}.
> ----
> *Expected Behavior*
> * {{JdbcIncrementalSource}} with {{StartupOptions.initial()}} should:
> ** take an initial snapshot of {{nova-fund.player_transaction}} (the table
> {*}does have a primary key{*}),
> ** and then continue to read changes from the replication slot.
> ----
> *Actual Behavior*
> * With {{{}StartupOptions.initial(){}}}:
> ** The job fails during startup / initial snapshot phase with an exception.
> * With {{{}StartupOptions.latest(){}}}:
> ** The job starts and runs normally, consuming only incremental changes.
> * With the legacy {{{}PostgreSQLSource{}}}:
> ** Full snapshot + further changes works fine with the same table and slot.
> ----
> *Environment*
> * Flink CDC: 3.5
> * Database: PostgreSQL (logical decoding plugin: {{{}pgoutput{}}})
> * Table: {{nova-fund.player_transaction}}
> ** primary key is defined
> * Deserializer: {{JsonDebeziumDeserializationSchema}}
> ----
> *Error / Stack Trace*
> d.p.s.AbstractSnapshotChangeEventSource - Snapshot - Final stage
> [2025-11-09 21:28:15.558] [INFO ] [pool-11-thread-1] [dev]
> [jobId=unknown-job] [host=dev-ubuntu-002] i.d.j.JdbcConnection - Connection
> gracefully closed
> [2025-11-09 21:28:15.559] [INFO ] [pool-12-thread-1] [dev]
> [jobId=unknown-job] [host=dev-ubuntu-002] i.d.j.JdbcConnection - Connection
> gracefully closed
> [2025-11-09 21:28:15.586] [INFO ] [pool-13-thread-1] [dev]
> [jobId=unknown-job] [host=dev-ubuntu-002] i.d.j.JdbcConnection - Connection
> gracefully closed
> [2025-11-09 21:28:15.587] [ERROR] [debezium-snapshot-reader-8] [dev]
> [jobId=unknown-job] [host=dev-ubuntu-002]
> o.a.f.c.c.b.s.r.e.IncrementalSourceScanFetcher - Execute snapshot read task
> for snapshot split
> SnapshotSplit\{tableId=nova-fund.nova-fund.player_transaction,
> splitId='nova-fund.nova-fund.player_transaction:0', splitKeyType=[`id`
> VARCHAR(36) NOT NULL], splitStart=null, splitEnd=null, highWatermark=null}
> fail
> io.debezium.DebeziumException: java.lang.NullPointerException
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.executeDataSnapshot(PostgresScanFetchTask.java:112)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:71)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.execute(PostgresScanFetchTask.java:86)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:100)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> Caused by: java.lang.NullPointerException
> at java.base/java.util.Objects.requireNonNull(Objects.java:233)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.createDataEvents(PostgresScanFetchTask.java:268)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:255)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:211)
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
> ... 7 more
> [2025-11-09 21:28:15.961] [WARN ] [Source Data Fetcher for Source:
> pg-cdc-fund -> Sink: Print to Std. Out (9/24)#0] [dev] [jobId=unknown-job]
> [host=dev-ubuntu-002] o.a.f.c.c.b.s.r.IncrementalSourceSplitReader - fetch
> data failed.
> org.apache.flink.util.FlinkRuntimeException: Read split
> SnapshotSplit\{tableId=nova-fund.nova-fund.player_transaction,
> splitId='nova-fund.nova-fund.player_transaction:0', splitKeyType=[`id`
> VARCHAR(36) NOT NULL], splitStart=null, splitEnd=null, highWatermark=null}
> error due to java.lang.NullPointerException.
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.checkReadException(IncrementalSourceScanFetcher.java:205)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollWithBuffer(IncrementalSourceScanFetcher.java:156)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:122)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:161)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
> at
> java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> Caused by: io.debezium.DebeziumException: java.lang.NullPointerException
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.executeDataSnapshot(PostgresScanFetchTask.java:112)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:71)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.execute(PostgresScanFetchTask.java:86)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:100)
> ... 3 more
> Caused by: java.lang.NullPointerException
> at java.base/java.util.Objects.requireNonNull(Objects.java:233)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.createDataEvents(PostgresScanFetchTask.java:268)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:255)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:211)
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
> ... 7 more
> [2025-11-09 21:28:15.964] [ERROR] [Source Data Fetcher for Source:
> pg-cdc-fund -> Sink: Print to Std. Out (9/24)#0] [dev] [jobId=unknown-job]
> [host=dev-ubuntu-002] o.a.f.c.b.s.r.f.SplitFetcherManager - Received uncaught
> exception.
> java.lang.RuntimeException: SplitFetcher thread 0 received unexpected
> exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
> at
> java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException:
> Read split SnapshotSplit\{tableId=nova-fund.nova-fund.player_transaction,
> splitId='nova-fund.nova-fund.player_transaction:0', splitKeyType=[`id`
> VARCHAR(36) NOT NULL], splitStart=null, splitEnd=null, highWatermark=null}
> error due to java.lang.NullPointerException.
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
> ... 7 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Read split
> SnapshotSplit\{tableId=nova-fund.nova-fund.player_transaction,
> splitId='nova-fund.nova-fund.player_transaction:0', splitKeyType=[`id`
> VARCHAR(36) NOT NULL], splitStart=null, splitEnd=null, highWatermark=null}
> error due to java.lang.NullPointerException.
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.checkReadException(IncrementalSourceScanFetcher.java:205)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollWithBuffer(IncrementalSourceScanFetcher.java:156)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:122)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:161)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
> ... 9 more
> Caused by: io.debezium.DebeziumException: java.lang.NullPointerException
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.executeDataSnapshot(PostgresScanFetchTask.java:112)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:71)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.execute(PostgresScanFetchTask.java:86)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:100)
> ... 3 more
> Caused by: java.lang.NullPointerException
> at java.base/java.util.Objects.requireNonNull(Objects.java:233)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.createDataEvents(PostgresScanFetchTask.java:268)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:255)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:211)
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
> ... 7 more
> [2025-11-09 21:28:15.969] [INFO ] [Source: pg-cdc-fund -> Sink: Print to Std.
> Out (9/24)#0] [dev] [jobId=unknown-job] [host=dev-ubuntu-002]
> o.a.f.c.b.s.r.SourceReaderBase - Closing Source Reader.
> [2025-11-09 21:28:15.970] [INFO ] [Source: pg-cdc-fund -> Sink: Print to Std.
> Out (9/24)#0] [dev] [jobId=unknown-job] [host=dev-ubuntu-002]
> o.a.f.c.b.s.r.f.SplitFetcher - Shutting down split fetcher 0
> [2025-11-09 21:28:15.990] [INFO ] [pool-14-thread-1] [dev]
> [jobId=unknown-job] [host=dev-ubuntu-002] i.d.j.JdbcConnection - Connection
> gracefully closed
> [2025-11-09 21:28:15.991] [INFO ] [Source Data Fetcher for Source:
> pg-cdc-fund -> Sink: Print to Std. Out (9/24)#0] [dev] [jobId=unknown-job]
> [host=dev-ubuntu-002] o.a.f.c.b.s.r.f.SplitFetcher - Split fetcher 0 exited.
> [2025-11-09 21:28:15.997] [WARN ] [Source: pg-cdc-fund -> Sink: Print to Std.
> Out (9/24)#0] [dev] [jobId=unknown-job] [host=dev-ubuntu-002] o.a.f.r.t.Task
> - Source: pg-cdc-fund -> Sink: Print to Std. Out (9/24)#0
> (3e49993d76fa194813ae301c672f5e17_cbc357ccb763df2852fee8c4fc7d55f2_8_0)
> switched from RUNNING to FAILED with failure cause:
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
> unexpected exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
> at
> java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> ... 1 more
> Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException:
> Read split SnapshotSplit\{tableId=nova-fund.nova-fund.player_transaction,
> splitId='nova-fund.nova-fund.player_transaction:0', splitKeyType=[`id`
> VARCHAR(36) NOT NULL], splitStart=null, splitEnd=null, highWatermark=null}
> error due to java.lang.NullPointerException.
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
> ... 7 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Read split
> SnapshotSplit\{tableId=nova-fund.nova-fund.player_transaction,
> splitId='nova-fund.nova-fund.player_transaction:0', splitKeyType=[`id`
> VARCHAR(36) NOT NULL], splitStart=null, splitEnd=null, highWatermark=null}
> error due to java.lang.NullPointerException.
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.checkReadException(IncrementalSourceScanFetcher.java:205)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollWithBuffer(IncrementalSourceScanFetcher.java:156)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:122)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:161)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
> ... 9 more
> Caused by: io.debezium.DebeziumException: java.lang.NullPointerException
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.executeDataSnapshot(PostgresScanFetchTask.java:112)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:71)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.execute(PostgresScanFetchTask.java:86)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:100)
> ... 3 more
> Caused by: java.lang.NullPointerException
> at java.base/java.util.Objects.requireNonNull(Objects.java:233)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.createDataEvents(PostgresScanFetchTask.java:268)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:255)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:211)
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
> ... 7 more
> [2025-11-09 21:28:16.004] [INFO ] [Source: pg-cdc-fund -> Sink: Print to Std.
> Out (9/24)#0] [dev] [jobId=unknown-job] [host=dev-ubuntu-002] o.a.f.r.t.Task
> - Freeing task resources for Source: pg-cdc-fund -> Sink: Print to Std. Out
> (9/24)#0
> (3e49993d76fa194813ae301c672f5e17_cbc357ccb763df2852fee8c4fc7d55f2_8_0).
> [2025-11-09 21:28:16.034] [INFO ] [flink-pekko.actor.default-dispatcher-8]
> [dev] [jobId=unknown-job] [host=dev-ubuntu-002] o.a.f.r.t.TaskExecutor -
> Un-registering task and sending final execution state FAILED to JobManager
> for task Source: pg-cdc-fund -> Sink: Print to Std. Out (9/24)#0
> 3e49993d76fa194813ae301c672f5e17_cbc357ccb763df2852fee8c4fc7d55f2_8_0.
> [2025-11-09 21:28:16.061] [INFO ] [flink-pekko.actor.default-dispatcher-7]
> [dev] [jobId=unknown-job] [host=dev-ubuntu-002] o.a.f.r.e.ExecutionGraph -
> Source: pg-cdc-fund -> Sink: Print to Std. Out (9/24)
> (3e49993d76fa194813ae301c672f5e17_cbc357ccb763df2852fee8c4fc7d55f2_8_0)
> switched from RUNNING to FAILED on 926d3be8-d86b-4ef9-980a-69c8233e976d @
> localhost (dataPort=-1).
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
> unexpected exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
> at
> java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> ... 1 more
> Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException:
> Read split SnapshotSplit\{tableId=nova-fund.nova-fund.player_transaction,
> splitId='nova-fund.nova-fund.player_transaction:0', splitKeyType=[`id`
> VARCHAR(36) NOT NULL], splitStart=null, splitEnd=null, highWatermark=null}
> error due to java.lang.NullPointerException.
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
> ... 7 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Read split
> SnapshotSplit\{tableId=nova-fund.nova-fund.player_transaction,
> splitId='nova-fund.nova-fund.player_transaction:0', splitKeyType=[`id`
> VARCHAR(36) NOT NULL], splitStart=null, splitEnd=null, highWatermark=null}
> error due to java.lang.NullPointerException.
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.checkReadException(IncrementalSourceScanFetcher.java:205)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollWithBuffer(IncrementalSourceScanFetcher.java:156)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:122)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:161)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
> ... 9 more
> Caused by: io.debezium.DebeziumException: java.lang.NullPointerException
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.executeDataSnapshot(PostgresScanFetchTask.java:112)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:71)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.execute(PostgresScanFetchTask.java:86)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:100)
> ... 3 more
> Caused by: java.lang.NullPointerException
> at java.base/java.util.Objects.requireNonNull(Objects.java:233)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.createDataEvents(PostgresScanFetchTask.java:268)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:255)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:211)
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
> ... 7 more
> [2025-11-09 21:28:16.072] [INFO ] [flink-pekko.actor.default-dispatcher-8]
> [dev] [jobId=unknown-job] [host=dev-ubuntu-002]
> o.a.f.r.r.s.FineGrainedSlotManager - Received resource requirements from job
> 897c3635a63b0f571da45355d867e00d:
> [ResourceRequirement\{resourceProfile=ResourceProfile{UNKNOWN},
> numberOfRequiredSlots=23}]
> [2025-11-09 21:28:16.078] [INFO ] [SourceCoordinator-Source: pg-cdc-fund]
> [dev] [jobId=unknown-job] [host=dev-ubuntu-002] o.a.f.r.s.c.SourceCoordinator
> - Removing registered reader after failure for subtask 8 (#0) of source
> Source: pg-cdc-fund.
> [2025-11-09 21:28:16.083] [INFO ] [flink-pekko.actor.default-dispatcher-7]
> [dev] [jobId=unknown-job] [host=dev-ubuntu-002] o.a.f.r.e.ExecutionGraph -
> Job nova-fund-game-player-cdc (897c3635a63b0f571da45355d867e00d) switched
> from state RUNNING to FAILING.
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:219)
> at
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.handleFailureAndReport(ExecutionFailureHandler.java:166)
> at
> org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:121)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:281)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:272)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:265)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:788)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:765)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:515)
> at
> java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
> at java.base/java.lang.reflect.Method.invoke(Method.java:580)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
> at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
> at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
> at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
> at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
> at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
> at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec$$$capture(ForkJoinTask.java:387)
> at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
> at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
> Caused by: java.lang.RuntimeException: One or more fetchers have encountered
> exception
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.base/java.lang.Thread.run(Thread.java:1583)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
> unexpected exception while polling the records
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
> at
> java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:317)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
> ... 1 more
> Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException:
> Read split SnapshotSplit\{tableId=nova-fund.nova-fund.player_transaction,
> splitId='nova-fund.nova-fund.player_transaction:0', splitKeyType=[`id`
> VARCHAR(36) NOT NULL], splitStart=null, splitEnd=null, highWatermark=null}
> error due to java.lang.NullPointerException.
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101)
> at
> org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
> at
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
> ... 7 more
> Caused by: org.apache.flink.util.FlinkRuntimeException: Read split
> SnapshotSplit\{tableId=nova-fund.nova-fund.player_transaction,
> splitId='nova-fund.nova-fund.player_transaction:0', splitKeyType=[`id`
> VARCHAR(36) NOT NULL], splitStart=null, splitEnd=null, highWatermark=null}
> error due to java.lang.NullPointerException.
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.checkReadException(IncrementalSourceScanFetcher.java:205)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollWithBuffer(IncrementalSourceScanFetcher.java:156)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.pollSplitRecords(IncrementalSourceScanFetcher.java:122)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:161)
> at
> org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98)
> ... 9 more
> Caused by: io.debezium.DebeziumException: java.lang.NullPointerException
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:85)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.executeDataSnapshot(PostgresScanFetchTask.java:112)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask.execute(AbstractScanFetchTask.java:71)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask.execute(PostgresScanFetchTask.java:86)
> at
> org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$1(IncrementalSourceScanFetcher.java:100)
> ... 3 more
> Caused by: java.lang.NullPointerException
> at java.base/java.util.Objects.requireNonNull(Objects.java:233)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.createDataEvents(PostgresScanFetchTask.java:268)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:255)
> at
> org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresScanFetchTask$PostgresSnapshotSplitReadTask.doExecute(PostgresScanFetchTask.java:211)
> at
> io.debezium.pipeline.source.AbstractSnapshotChangeEventSource.execute(AbstractSnapshotChangeEventSource.java:76)
> ... 7 more
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)