hmalaspina opened a new issue, #10149:
URL: https://github.com/apache/seatunnel/issues/10149
Hello,
I am having issues with using the Postgres-CDC source connector.
I have a setup that streams the captured channges on a Postgres table to an
Iceberg table.
In our preproduction environment that has very low traffic it's working fine.
In our production environment it periodically gets
`java.net.SocketException: Socket is closed` leading to the job being cancelled
after too many exceptions.
I have tried increasing Postgres `wal_sender_timeout` to 20 minutes. Does
not make any difference with the default 1 minute
Some informations anout our setup:
The job is run with the Zeta engine in a Seatunnel cluster comprised of 2
masters and 2 workers, all in EKS.
The Postgres DB version is 16.8. The Postgres instance and the Seatunnel
cluster are in the same VPC.
The Seatunnel has been deployed using the Seatunnel provided helm chart.
The only meaningfull change made to the values.yaml is to pin down the
seatunnel version to 2.3.11 as I have never managed to get the Postgres
connector to connect to our table in 2.3.12.
Here is the full diff of the values for the sake of completeness:
```
Common subdirectories: seatunnel-helm-original/conf and seatunnel-helm/conf
Common subdirectories: seatunnel-helm-original/templates and
seatunnel-helm/templates
diff --color=auto seatunnel-helm-original/values.yaml
seatunnel-helm/values.yaml
24c24
< tag: ""
---
> tag: "2.3.11"
31c31
< value: Asia/Shanghai
---
> value: UTC
48,51c48,51
< prometheus.io/path: /hazelcast/rest/instance/metrics
< prometheus.io/port: "5801"
< prometheus.io/scrape: "true"
< prometheus.io/role: "seatunnel-master"
---
> # prometheus.io/path: /hazelcast/rest/instance/metrics
> # prometheus.io/port: "5801"
> # prometheus.io/scrape: "true"
> # prometheus.io/role: "seatunnel-master"
58c58,59
< nodeSelector: {}
---
> nodeSelector:
> node.family: on-demand
61c62,66
< tolerations: []
---
> tolerations:
> - key: node.family
> operator: Equal
> value: on-demand
> effect: NoSchedule
64,71c69,75
< resources: {}
< # resources:
< # limits:
< # memory: "4Gi"
< # cpu: "4"
< # requests:
< # memory: "2Gi"
< # cpu: "500m"
---
> resources:
> limits:
> memory: "4Gi"
> cpu: "2"
> requests:
> memory: "4Gi"
> cpu: "2"
111a116
> karpenter.sh/do-not-disrupt: "true"
124,131c129,135
< resources: {}
< # resources:
< # limits:
< # memory: "4Gi"
< # cpu: "4"
< # requests:
< # memory: "2Gi"
< # cpu: "500m"
---
> resources:
> limits:
> memory: "10Gi"
> cpu: "2"
> requests:
> memory: "10Gi"
> cpu: "2"
160a165,196
>
> # Secret configuration for database credentials
> secret:
> # Enable secret creation
> enabled: true
> # Secret name
> name: "postgres-credentials"
> # Database credentials
> dbUser: "redacted"
> dbPassword: "redacted" # your password here
>
> # Persistent Volume Claim for checkpoint storage
> persistence:
> # Enable PVC creation for checkpoint storage
> enabled: true
> # PVC name
> name: "seatunnel-checkpoint-pvc"
> # Access mode
> accessModes:
> - ReadWriteMany
> # Storage size
> size: 10Gi
> # Storage class name (optional)
> storageClassName: "efs-sc"
> # Mount path for checkpoints
> mountPath: "/tmp/seatunnel/checkpoint_snapshot"
>
> # ServiceAccount configuration
> serviceAccount:
> # Annotations for the ServiceAccount (e.g., EKS IAM role)
> annotations:
> eks.amazonaws.com/role-arn: "redacted
```
Here are the current Postgres parameters we are running with.
```
wal_sender_timeout 6min
tcp_keepalives_idle 300
tcp_keepalives_interval 30
tcp_keepalives_count 3
```
This is our job definition:
```
# SeaTunnel Job: PostgreSQL CDC to Iceberg Test Table
# Streams real-time changes from PostgreSQL redacted to Iceberg table
redacted
env {
execution.parallelism = 1
job.retry.times = 3
job.mode = "STREAMING"
job.name = "redacted"
# Enable checkpointing for fault tolerance - every 5 minutes # low
frequency checkpoints allow for less frequent writes to icebrg
checkpoint.interval = 300000
checkpoint.timeout = 900000
read_limit.rows_per_second = 2500
}
source {
Postgres-CDC {
# Plugin identifier
plugin_output = "postgres_cdc_source"
# Database connection
base-url = "jdbc:postgresql://redacted/redacted?tcpKeepAlive=true"
username = "redacted"
password = ${DB_PASSWORD}
# Database and schema to monitor
database-names = ["redacted"]
schema-names = ["public"]
# Tables to monitor (format: database.schema.table)
table-names = ["redacted"]
# Startup mode
# LATEST: Skip snapshot, start streaming from current WAL position
(streaming only)
# INITIAL: Read full snapshot, then continue streaming (recommended for
CDC)
# NOTE: After job failure, start fresh job (don't use -r resume, it's
broken).
# INITIAL mode + replication slot = automatic recovery from last
committed position
startup.mode = "INITIAL"
slot.name = "seatunnel_iceberg_test_slot"
# Decoding plugin (pgoutput is recommended for PostgreSQL 10+)
decoding.plugin.name = "pgoutput"
# Snapshot configuration - optimized for high throughput with 16GB pod
# With 146M rows and 500k chunks = ~300 splits (manageable memory)
snapshot.split.size = 500000 # 500k rows per split (CRITICAL - keeps
split count low)
snapshot.fetch.size = 30000 # 30k rows per fetch (high performance)
# Incremental snapshot chunk size (for parallel snapshot reading)
scan.incremental.snapshot.chunk.size = 200000
# Connection settings
connect.timeout.ms = 800000
connect.max-retries = 6
connection.pool.size = 20
# Server timezone
server-time-zone = "UTC"
# Exactly-once semantics (recommended for production)
exactly_once = true
# Pass-through Debezium properties
# Use existing publication created by table owner
debezium = {
"publication.autocreate.mode" = "disabled"
"publication.name" = "seatunnel_cdc_publication"
# keepalive / heartbeat
}
# Output format
format = "DEFAULT"
}
}
sink {
Iceberg {
plugin_input = "postgres_cdc_source"
catalog_name = "glue_catalog"
catalog_type = "glue"
iceberg.table.upsert-mode-enabled = true
primary_keys = "redacted"
# AWS Glue catalog configuration with AssumeRoleAwsClientFactory
iceberg.catalog.config = {
"catalog-impl" = "org.apache.iceberg.aws.glue.GlueCatalog"
"warehouse" = "redacted"
"io-impl" = "org.apache.iceberg.aws.s3.S3FileIO"
"glue.account-id" = "redacted"
# Use AssumeRoleAwsClientFactory for cross-account access
"client.factory" = "org.apache.iceberg.aws.AssumeRoleAwsClientFactory"
"client.assume-role.arn" = "redacted"
"client.assume-role.region" = "redacted
"write.update.mode" = "merge-on-read"
"write.delete.mode" = "merge-on-read"
# "write.target-file-size-bytes" = "67108864" # 64MB files
}
iceberg.table.write-props = {
"write.update.mode" = "merge-on-read"
"write.delete.mode" = "merge-on-read"
}
namespace = "redacted"
table = "redacted"
}
}
```
This is our stack trace when the exception happens:
```
Exception in thread "main"
org.apache.seatunnel.core.starter.exception.CommandExecuteException: SeaTunnel
job executed failed
at
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:228)
at org.apache.seatunnel.core.starter.SeaTunnel.run(SeaTunnel.java:40)
at
org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient.main(SeaTunnelClient.java:40)
Caused by:
org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException:
java.lang.RuntimeException: One or more fetchers have encountered exception
at
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:147)
at
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:167)
at
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:93)
at
org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:119)
at
org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:159)
at
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:127)
at
org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:165)
at
org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:132)
at
org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:694)
at
org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1023)
at org.apache.seatunnel.api.tracing.MDCRunnable.run(MDCRunnable.java:43)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
unexpected exception while polling the records
at
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
at
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:81)
... 5 more
Caused by: org.apache.kafka.connect.errors.RetriableException: An exception
occurred in the change event producer. This connector will be restarted.
at
io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46)
at
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:180)
at
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.wal.PostgresWalFetchTask.execute(PostgresWalFetchTask.java:74)
at
org.apache.seatunnel.connectors.cdc.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:107)
... 5 more
Caused by: org.postgresql.util.PSQLException: Database connection failed
when reading from copy
at
org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1166)
at
org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:44)
at
org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:160)
at
org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:125)
at
org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:82)
at
io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(PostgresReplicationConnection.java:504)
at
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(PostgresStreamingChangeEventSource.java:215)
at
io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:177)
... 7 more
Caused by: java.net.SocketException: Socket is closed
at java.net.Socket.setSoTimeout(Socket.java:1155)
at
sun.security.ssl.BaseSSLSocketImpl.setSoTimeout(BaseSSLSocketImpl.java:639)
at sun.security.ssl.SSLSocketImpl.setSoTimeout(SSLSocketImpl.java:73)
at org.postgresql.core.PGStream.hasMessagePending(PGStream.java:210)
at
org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1208)
at
org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1164)
... 14 more
at
org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute(ClientExecuteCommand.java:220)
... 2 more
2025-12-03 00:17:29,651 INFO [s.c.s.s.c.ClientExecuteCommand]
[SeaTunnel-CompletableFuture-Thread-0] - run shutdown hook because get close
signal
```
And at the same time we see this in the logs of our database.
```
SSL error: SSLV3_ALERT_UNEXPECTED_MESSAGE
LOG: could not receive data from client: Connection reset by peer
LOG: unexpected EOF on standby connection
CONTEXT: slot "seatunnel_iceberg_test_slot", output plugin "pgoutput"
```
Does anyone know about this issue ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]