GitSOMIC opened a new issue, #10322: URL: https://github.com/apache/seatunnel/issues/10322
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened # Startup Mode: # initial: Synchronize historical data at startup, then synchronize incremental data. # earliest: Start from the earliest possible offset. # latest: Start from the latest offset. startup.mode = "initial" This might be related to the startup mode. When I use "latest", this error doesn't occur. However, I need to synchronize historical data. Also, it might be related to the fact that I enabled the full table log after the database and data writing were completed. ### SeaTunnel Version apache-seatunnel-2.3.12 ### SeaTunnel Config ```conf env { # job.mode = "BATCH" https://seatunnel.apache.org/zh-CN/docs/2.3.12/connector-v2/source/PostgreSQL # 调优指南 # 适当增加sink.buffer-size和doris.batch.size的值可以提高写性能。 # 在流模式下,如果doris.batch.size和checkpoint.interval都配置为较大的值,最后到达的数据可能会有较大的延迟(延迟的时间就是检查点间隔的时间)。 # 这是因为最后到达的数据总量可能不会超过doris.batch.size指定的阈值。因此,在接收到数据的数据量没有超过该阈值之前只有检查点才会触发提交操作。因此,需要选择一个合适的检查点间隔。 # 此外,如果你通过sink.enable-2pc=true属性启用2pc。sink.buffer-size将会失去作用,只有检查点才能触发提交。 job.name = "CONFIG_TEST_2026" # 流式处理必须设置为 STREAMING https://seatunnel.apache.org/zh-CN/docs/2.3.12/connector-v2/source/PostgreSQL-CDC job.mode = "STREAMING" # 开启 Checkpoint 是 CDC 任务的硬性要求,用于记录消费位点 # 单位毫秒,这里设置为 5000ms (5秒) 做一次 Checkpoint checkpoint.interval = 10000 # 生产环境建议根据资源调整并行度 execution.parallelism = 1 } source { Postgres-CDC { slot.name = "ore_slot_test_20260112n1111nn" hostname = "192.168.232.1" port = 5432 username = "postgres" password = "postgres" database-names = ["mining_production"] schema-names = ["public"] url = "jdbc:postgresql://192.168.232.1:5432/mining_production?loggerLevel=OFF" # 定义需要同步的表列表 table-names = [ "mining_production.public.ore_batches", "mining_production.public.mill_equipment", "mining_production.public.flotation_equipment", "mining_production.public.production_batches", "mining_production.public.equipment_operations", "mining_production.public.quality_tests" ] # 逻辑解码插件,PG 10+ 默认推荐使用 pgoutput decoding.plugin.name = "pgoutput" # 启动模式: # initial: 启动时同步历史数据,然后同步增量数据。 # earliest: 从可能的最早偏移量启动。 # latest: 从最新偏移量启动。 startup.mode = "initial" sslmode = "disable" } } # https://seatunnel.apache.org/zh-CN/docs/2.3.12/connector-v2/sink/Doris sink { Doris { fenodes = "192.168.232.20:8030" username = "root" password = "" database = "mining_dw" # 动态表名映射逻辑 # PG 表名: ore_batches -> Doris 表名: ods_ore_batches # 使用 replace 变量,${table_name} 会获取 source 中的表名(不带 schema) table = "ods_${table_name}" sink.label-prefix = "label_mining_sync" # 开启 Stream Load 的 JSON 格式支持 doris.config { format = "json" read_json_by_line = "true" } # 是否启用两阶段提交(2pc),默认为 false。 https://blog.csdn.net/weixin_42148384/article/details/149138892 sink.enable-2pc = "false" # 针对 CDC 数据的重要配置:支持删除和更新 # 注意:这要求 Doris 表模型支持更新(见下文注意事项) sink.enable-delete = "false" # 自动建表 # save_mode_create_template = """ # CREATE TABLE IF NOT EXISTS ${table} ( # ${columns} # ) # ENGINE=OLAP # DUPLICATE KEY(${primary_key}) # DISTRIBUTED BY HASH(${primary_key}) BUCKETS 10 # PROPERTIES ( # "replication_num" = "3" # ) # """ } } # 注意事项 # 我们创建的 Doris 表使用的是 DUPLICATE KEY 模型。 # 在流式 CDC 场景下,这会有问题: # Postgres 操作:用户修改了一条记录(Update)或删除了一条记录(Delete)。 # Duplicate Key 行为:Doris 的 Duplicate Key 模型只是简单的追加数据。 # 如果是 Update,Doris 会新增一条修改后的数据,老数据还在。 # 如果是 Delete,Doris 默认无法在 Duplicate 模型上直接执行物理删除同步(除非你只是想记录删除流水)。 # 建议修改 Doris 建表模型 如果你希望 Doris 的数据与 Postgres 实时保持一致(镜像),你需要将 Doris 表重建为 UNIQUE KEY 模型。 # 总结: # 如果你只是想做日志归档(记录每一次变更历史),保持 DUPLICATE KEY 不变。 # 如果你想做数据同步(Doris 里的数据状态 = PG 里的当前状态),请务必将 Doris 表改为 UNIQUE KEY 模型。 ``` ### Running Command ```shell ./bin/seatunnel.sh --config ./job/ore_PostgresCDC_2_Doris.conf -m local ``` ### Error Exception ```log `2026-01-12 14:14:16,257 ERROR [o.a.s.c.s.SeaTunnel ] [main] - =============================================================================== Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel job executed failed at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228) at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40) at org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40) Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: One or more fetchers have encountered exception at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119) at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165) at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:679) at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1008) at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81) ... 5 more Caused by: io.debezium.DebeziumException: Creation of replication slot failed at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:218) at org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceScanFetcher.submitTask(IncrementalSourceScanFetcher.java:85) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.checkSplitOrStartNext(IncrementalSourceSplitReader.java:147) at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:71) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.FetchTask.run(FetchTask.java:54) at org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:162) ... 6 more Caused by: org.postgresql.util.PSQLException: 错误: 复制槽名 "ore_slot_test_20260112n1111nn" 已经存在 at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2736) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2421) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:372) at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:525) at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:435) at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:357) at org.postgresql.jdbc.PgStatement.executeCachedSql(PgStatement.java:342) at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318) at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:313) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createReplicationSlot(PostgresReplicationConnection.java:394) at org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext.configure(PostgresSourceFetchTaskContext.java:210) ... 11 more at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220) ... 2 more ` ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots <img width="1867" height="892" alt="Image" src="https://github.com/user-attachments/assets/98eb4fae-6f62-4c94-9a08-f9d94c819bd9" /> ### Are you willing to submit PR? - [x] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
