QiChunHui opened a new issue, #10997:
URL: https://github.com/apache/seatunnel/issues/10997

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   In production we hit an intermittent problem: a **BATCH** job (JDBC MySQL 
source → JDBC
   `MultiTableSink`) finished reading all source data and the bounded reader 
closed normally, but the
   whole job then **stays in `RUNNING` forever and never turns `FINISHED`**. It 
cannot finish on its
   own — the only way to clear it is a manual cancel or a master restart.
   
   This does not happen every time; it shows up intermittently under a high 
job-submission load.
   Below are the clues we collected. We'd like the community's help to analyze 
the possible causes and
   directions for improvement.
   
   Environment & workload characteristics
   
   - SeaTunnel version: 2.3.13, Zeta engine.
   - Deployment: separated master/worker, 2 masters (HA 32C+48G heap) + 8 
workers(3 32C+48G heap + 5 16C+20G heap).
   - **Workload**: the platform has **1000+ data-collection jobs**, submitted 
periodically by a
     scheduler, with **on average several dozen job submissions per minute** 
(higher at peak). So the
     cluster runs continuously under "sustained high-frequency submission + 
many small concurrent jobs".
   - The job that hung is itself trivial: a single bounded table read, BATCH 
mode, and **no
     `checkpoint.interval` configured (i.e. checkpoint disabled)**.
   
   ### SeaTunnel Version
   
   SeaTunnel version: 2.3.13, Zeta engine
   
   ### SeaTunnel Config
   
   ```conf
   for master:
   
   -- seatunnel.yaml
   seatunnel:
     engine:
       classloader-cache-mode: true
       history-job-expire-minutes: 30
       backup-count: 0
       queue-type: blockingqueue
       job-metrics-backup-interval: 60
       print-execution-info-interval: 60
       print-job-metrics-info-interval: 1800
       job-metrics-partition-count: 4
       job-schedule-strategy: WAIT
       slot-service:
         dynamic-slot: false
       checkpoint:
         interval: 60000
         timeout: 600000
         storage:
           ...
       telemetry:
         metric:
           enabled: false
         logs:
           scheduled-deletion-enable: false
       http:
         enable-http: true
         port: 8010
         enable-dynamic-port: false
   
   -- hazelcast-master.yaml
   hazelcast:
     cluster-name: platform
     network:
       rest-api:
         enabled: false
         endpoint-groups:
           CLUSTER_WRITE:
             enabled: true
           DATA:
             enabled: true
       join:
         ...
       port:
         auto-increment: false
         port: 5801
     properties:
       hazelcast.invocation.max.retry.count: 20
       hazelcast.tcp.join.port.try.count: 30
       hazelcast.logging.type: log4j2
       hazelcast.operation.generic.thread.count: 50
       hazelcast.operation.call.timeout.millis: 60000
       hazelcast.operation.backup.timeout.millis: 60000
       hazelcast.heartbeat.failuredetector.type: phi-accrual
       hazelcast.heartbeat.interval.seconds: 5
       hazelcast.max.no.heartbeat.seconds: 300
       hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
       hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
       hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
       hazelcast.operation.thread.count: 32
       hazelcast.io.thread.count: 32
     member-attributes:
       group:
         type: string
         value: default
       team:
         type: string
         value: default
   
   for worker:
   
   -- seatunnel.yaml
   seatunnel:
     engine:
       classloader-cache-mode: true
       history-job-expire-minutes: 30
       backup-count: 0
       queue-type: blockingqueue
       job-metrics-backup-interval: 60
       print-execution-info-interval: 60
       print-job-metrics-info-interval: 1800
       job-metrics-partition-count: 4
       job-schedule-strategy: WAIT
       slot-service:
         dynamic-slot: false
         slot-num: 96
         slot-allocate-strategy: slot_ratio
       checkpoint:
         interval: 60000
         timeout: 600000
         storage:
           ...
       telemetry:
         metric:
           enabled: false
         logs:
           scheduled-deletion-enable: false
       http:
         enable-http: true
         port: 8010
         enable-dynamic-port: false
   
   -- hazelcast-worker.yaml
   hazelcast:
     cluster-name: platform
     network:
       join:
         ...
       port:
         auto-increment: false
         port: 5802
     properties:
       hazelcast.invocation.max.retry.count: 20
       hazelcast.tcp.join.port.try.count: 30
       hazelcast.logging.type: log4j2
       hazelcast.io.thread.count: 32
       hazelcast.operation.generic.thread.count: 50
       hazelcast.operation.call.timeout.millis: 60000
       hazelcast.operation.backup.timeout.millis: 60000
       hazelcast.heartbeat.failuredetector.type: phi-accrual
       hazelcast.heartbeat.interval.seconds: 5
       hazelcast.max.no.heartbeat.seconds: 300
       hazelcast.heartbeat.phiaccrual.failuredetector.threshold: 10
       hazelcast.heartbeat.phiaccrual.failuredetector.sample.size: 200
       hazelcast.heartbeat.phiaccrual.failuredetector.min.std.dev.millis: 100
       hazelcast.operation.thread.count: 32
     member-attributes:
       group:
         type: string
         value: default
       team:
         type: string
         value: default
   ```
   
   ### Running Command
   
   ```shell
   start up: ./bin/seatunnel-cluster.sh -d -r master  
./bin/seatunnel-cluster.sh -d -r worker
   submit job: /submit-job API use the JSON config
   ```
   
   ### Error Exception
   
   ```log
   ## Clues we found
   
   ### Clue 1: state of the three task threads while hung (from jstack)
   
   | Role | Thread-stack location | Meaning |
   |---|---|---|
   | Source reader (taskGroup 2) | looping in `SourceFlowLifeCycle.collect:194` 
(sleep) | reader's internal `prepareClose=true`, but the **task-level 
`prepareCloseStatus=false`, still in RUNNING** |
   | Sink side (taskGroup 2) | blocked in 
`IntermediateBlockingQueue.collect:54` (poll) | waiting forever for data / a 
close marker on the intermediate queue that never arrives |
   | SplitEnumerator (taskGroup 1) | 
`SourceSplitEnumeratorTask.stateProcess:350` (PREPARE_CLOSE) | already detected 
all readers finished, already sent `LastCheckpointNotifyOperation` to master, 
waiting for `closeCalled` |
   
   The source finished reading at `17:48:42`; after that there is **almost no 
further log** for this
   job on either worker or master.
   
   ### Clue 2: how a bounded BATCH job is supposed to finish (for reference)
   
   Normally a bounded BATCH job is closed by a "final checkpoint":
   
   1. reader finishes → `signalNoMoreElement()` → notifies the enumerator;
   2. enumerator detects all readers finished → sends 
`LastCheckpointNotifyOperation` to master →
      `CheckpointCoordinator.readyToClose()`;
   3. master triggers the final checkpoint 
`tryTriggerPendingCheckpoint(COMPLETED_POINT_TYPE)`;
   4. this close barrier flows enumerator → reader → sink, each acks, and it 
sets `prepareCloseStatus`
      to true on every task, advancing each task `RUNNING → PREPARE_CLOSE → 
CLOSED`;
   5. once all complete, master tells the tasks to close and the job finishes.
   
   In the hung job, step 3 appears to have happened (the enumerator is already 
in PREPARE_CLOSE), but
   the **source reader's `prepareCloseStatus` was never set** — i.e. the 
close-barrier handshake never
   completed / never reached the reader.
   
   ### Clue 3: in the same time window, worker↔master Hazelcast communication 
was massively timing out and the connection was flapping
   
   In the logs of the worker that ran the source/sink (172.18.1.145) and the 
active master
   (172.18.1.143), **right in the window when the job tried to close (around 
source close at
   `17:48:42`)**, cluster-internal communication was badly degraded:
   
   - the master logged **1419 `Slow operation detected: 
ReportMetricsOperation`** warnings within one
     hour; `ReportMetricsOperation` (partitionId=-1) was hogging almost all 
generic-operation threads;
   - several operations from the worker to the master were **rejected by the 60s
     operation-call-timeout**, e.g. `JobEventReportOperation` (17:46:36, 
17:48:03, with the connection
     already `alive=false`) and `ReportMetricsOperation` (17:48:59, `ERROR ... 
failed to update metrics`);
   - the TCP cluster connection worker(145)↔master(143) was **re-initialized 6 
times between 17:39 and
     17:56** (each reconnect drops the operations in flight on that 
connection); meanwhile **no
     Hazelcast cluster membership change occurred** (no master failover) — only 
the connection flapped;
   - shortly after, `SourceRegisterOperation` was flagged as a slow operation 
(17:50:37) and then
     **failed after 30 retries** (17:51:27), confirming operations were being 
lost during this period.
   
   ### Clue 4: the close handshake relies on a chain of Hazelcast operations
   
   The close flow in step 2 is, in implementation, a chain of cross-node 
Hazelcast operations:
   `LastCheckpointNotifyOperation` (worker→master), 
`CheckpointBarrierTriggerOperation`
   (master→enumerator), `BarrierFlowOperation` (enumerator→reader), 
`TaskAcknowledgeOperation`
   (reader→master). If any one of these is lost (in a connection drop / 
60s-timeout rejection) and is
   not retried, the close flow cannot complete.
   
   We also noticed that when `checkpoint.interval` is not configured 
(checkpoint disabled), this final
   checkpoint used to close the bounded source does not seem to have the same 
timeout detection that a
   normal checkpoint has (this is only an observation; we're not sure whether 
it is by design).
   
   ## Our questions / what we'd like the community to look at
   
   1. Under our "1000+ jobs, several dozen submissions per minute" 
high-frequency scenario,
      `ReportMetricsOperation` and similar internal operations saturate the 
master's operation threads
      and cause the worker↔master connection to repeatedly time out / reconnect 
— is this a known
      scalability bottleneck? How does the community usually mitigate it?
   2. For this phenomenon, what does the community think are the **possible 
root causes** and the
      **directions for future optimization**? (Whether at the config, 
deployment, or engine level —
      we'd love to hear any thoughts.)
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   Java: 8
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to