QiChunHui opened a new issue, #10997: URL: https://github.com/apache/seatunnel/issues/10997
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened In production we hit an intermittent problem: a **BATCH** job (JDBC MySQL source → JDBC `MultiTableSink`) finished reading all source data and the bounded reader closed normally, but the whole job then **stays in `RUNNING` forever and never turns `FINISHED`**. It cannot finish on its own — the only way to clear it is a manual cancel or a master restart. This does not happen every time; it shows up intermittently under a high job-submission load. Below are the clues we collected. We'd like the community's help to analyze the possible causes and directions for improvement. Environment & workload characteristics - SeaTunnel version: 2.3.13, Zeta engine. - Deployment: separated master/worker, 2 masters (HA 32C+48G heap) + 8 workers(3 32C+48G heap + 5 16C+20G heap). - **Workload**: the platform has **1000+ data-collection jobs**, submitted periodically by a scheduler, with **on average several dozen job submissions per minute** (higher at peak). So the cluster runs continuously under "sustained high-frequency submission + many small concurrent jobs". - The job that hung is itself trivial: a single bounded table read, BATCH mode, and **no `checkpoint.interval` configured (i.e. checkpoint disabled)**. ### SeaTunnel Version SeaTunnel version: 2.3.13, Zeta engine ### SeaTunnel Config ```conf for master: -- seatunnel.yaml seatunnel: engine: classloader-cache-mode: true history-job-expire-minutes: 30 backup-count: 0 queue-type: blockingqueue job-metrics-backup-interval: 60 print-execution-info-interval: 60 print-job-metrics-info-interval: 1800 job-metrics-partition-count: 4 job-schedule-strategy: WAIT slot-service: dynamic-slot: false checkpoint: interval: 60000 timeout: 600000 storage: ... telemetry: metric: enabled: false logs: scheduled-deletion-enable: false http: enable-http: true port: 8010 enable-dynamic-port: false -- hazelcast-master.yaml hazelcast: cluster-name: platform network: rest-api: enabled: false endpoint-groups: CLUSTER_WRITE: enabled: true DATA: enabled: true join: ... port: auto-increment: false port: 5801 properties: hazelcast.invocation.max.retry.count: 20 hazelcast.tcp.join.port.try.count: 30 hazelcast.logging.type: log4j2 hazelcast.operation.generic.thread.count: 50 hazelcast.operation.call.timeout.millis: 60000 hazelcast.operation.backup.timeout.millis: 60000 hazelcast.heartbeat.failuredetector.type: phi-accrual hazelcast.heartbeat.interval.seconds: 5 hazelcast.max.no.heartbeat.seconds: 300 hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10 hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200 hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100 hazelcast.operation.thread.count: 32 hazelcast.io.thread.count: 32 member-attributes: group: type: string value: default team: type: string value: default for worker: -- seatunnel.yaml seatunnel: engine: classloader-cache-mode: true history-job-expire-minutes: 30 backup-count: 0 queue-type: blockingqueue job-metrics-backup-interval: 60 print-execution-info-interval: 60 print-job-metrics-info-interval: 1800 job-metrics-partition-count: 4 job-schedule-strategy: WAIT slot-service: dynamic-slot: false slot-num: 96 slot-allocate-strategy: slot_ratio checkpoint: interval: 60000 timeout: 600000 storage: ... telemetry: metric: enabled: false logs: scheduled-deletion-enable: false http: enable-http: true port: 8010 enable-dynamic-port: false -- hazelcast-worker.yaml hazelcast: cluster-name: platform network: join: ... port: auto-increment: false port: 5802 properties: hazelcast.invocation.max.retry.count: 20 hazelcast.tcp.join.port.try.count: 30 hazelcast.logging.type: log4j2 hazelcast.io.thread.count: 32 hazelcast.operation.generic.thread.count: 50 hazelcast.operation.call.timeout.millis: 60000 hazelcast.operation.backup.timeout.millis: 60000 hazelcast.heartbeat.failuredetector.type: phi-accrual hazelcast.heartbeat.interval.seconds: 5 hazelcast.max.no.heartbeat.seconds: 300 hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10 hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200 hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100 hazelcast.operation.thread.count: 32 member-attributes: group: type: string value: default team: type: string value: default ``` ### Running Command ```shell start up: ./bin/seatunnel-cluster.sh -d -r master ./bin/seatunnel-cluster.sh -d -r worker submit job: /submit-job API use the JSON config ``` ### Error Exception ```log ## Clues we found ### Clue 1: state of the three task threads while hung (from jstack) | Role | Thread-stack location | Meaning | |---|---|---| | Source reader (taskGroup 2) | looping in `SourceFlowLifeCycle.collect:194` (sleep) | reader's internal `prepareClose=true`, but the **task-level `prepareCloseStatus=false`, still in RUNNING** | | Sink side (taskGroup 2) | blocked in `IntermediateBlockingQueue.collect:54` (poll) | waiting forever for data / a close marker on the intermediate queue that never arrives | | SplitEnumerator (taskGroup 1) | `SourceSplitEnumeratorTask.stateProcess:350` (PREPARE_CLOSE) | already detected all readers finished, already sent `LastCheckpointNotifyOperation` to master, waiting for `closeCalled` | The source finished reading at `17:48:42`; after that there is **almost no further log** for this job on either worker or master. ### Clue 2: how a bounded BATCH job is supposed to finish (for reference) Normally a bounded BATCH job is closed by a "final checkpoint": 1. reader finishes → `signalNoMoreElement()` → notifies the enumerator; 2. enumerator detects all readers finished → sends `LastCheckpointNotifyOperation` to master → `CheckpointCoordinator.readyToClose()`; 3. master triggers the final checkpoint `tryTriggerPendingCheckpoint(COMPLETED_POINT_TYPE)`; 4. this close barrier flows enumerator → reader → sink, each acks, and it sets `prepareCloseStatus` to true on every task, advancing each task `RUNNING → PREPARE_CLOSE → CLOSED`; 5. once all complete, master tells the tasks to close and the job finishes. In the hung job, step 3 appears to have happened (the enumerator is already in PREPARE_CLOSE), but the **source reader's `prepareCloseStatus` was never set** — i.e. the close-barrier handshake never completed / never reached the reader. ### Clue 3: in the same time window, worker↔master Hazelcast communication was massively timing out and the connection was flapping In the logs of the worker that ran the source/sink (172.18.1.145) and the active master (172.18.1.143), **right in the window when the job tried to close (around source close at `17:48:42`)**, cluster-internal communication was badly degraded: - the master logged **1419 `Slow operation detected: ReportMetricsOperation`** warnings within one hour; `ReportMetricsOperation` (partitionId=-1) was hogging almost all generic-operation threads; - several operations from the worker to the master were **rejected by the 60s operation-call-timeout**, e.g. `JobEventReportOperation` (17:46:36, 17:48:03, with the connection already `alive=false`) and `ReportMetricsOperation` (17:48:59, `ERROR ... failed to update metrics`); - the TCP cluster connection worker(145)↔master(143) was **re-initialized 6 times between 17:39 and 17:56** (each reconnect drops the operations in flight on that connection); meanwhile **no Hazelcast cluster membership change occurred** (no master failover) — only the connection flapped; - shortly after, `SourceRegisterOperation` was flagged as a slow operation (17:50:37) and then **failed after 30 retries** (17:51:27), confirming operations were being lost during this period. ### Clue 4: the close handshake relies on a chain of Hazelcast operations The close flow in step 2 is, in implementation, a chain of cross-node Hazelcast operations: `LastCheckpointNotifyOperation` (worker→master), `CheckpointBarrierTriggerOperation` (master→enumerator), `BarrierFlowOperation` (enumerator→reader), `TaskAcknowledgeOperation` (reader→master). If any one of these is lost (in a connection drop / 60s-timeout rejection) and is not retried, the close flow cannot complete. We also noticed that when `checkpoint.interval` is not configured (checkpoint disabled), this final checkpoint used to close the bounded source does not seem to have the same timeout detection that a normal checkpoint has (this is only an observation; we're not sure whether it is by design). ## Our questions / what we'd like the community to look at 1. Under our "1000+ jobs, several dozen submissions per minute" high-frequency scenario, `ReportMetricsOperation` and similar internal operations saturate the master's operation threads and cause the worker↔master connection to repeatedly time out / reconnect — is this a known scalability bottleneck? How does the community usually mitigate it? 2. For this phenomenon, what does the community think are the **possible root causes** and the **directions for future optimization**? (Whether at the config, deployment, or engine level — we'd love to hear any thoughts.) ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version Java: 8 ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
