VincentSleepless opened a new issue, #4306: URL: https://github.com/apache/incubator-seatunnel/issues/4306
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened version 2.3.1 dev engine zeta connector version : v2 connector : maxcompute sink task mode : batch local error describe: when i submit a mysql->maxcompute bacth job with zeta engine local mode, when job is nearly finished,the maxcompute sink close function commit a not exists block id . ### SeaTunnel Version 2.3.1 dev ### SeaTunnel Config ```conf env { execution.parallelism = 10 job.name = "mysql_maxcompute_test" job.mode = "BATCH" checkpoint.interval = 10000 #execution.checkpoint.interval = 10000 #execution.checkpoint.data-uri = "hdfs://172.20.221.59:8020/seatunnel/checkpoint/" } source { Jdbc { url = "jdbc:mysql://xxxx:3306/xxxx" driver = "com.mysql.cj.jdbc.Driver" connection_check_timeout_sec = 100 user = "" password = "" query = "select * from action_log" partition_column = "id" partition_num = 10 } } sink { Maxcompute { accessId="" accesskey="" endpoint="http://odps-ext.aliyun-inc.com/api" project="xxx_dev" table_name="ods_za_schedule_action_log" partition_spec="pt=20230307000000" overwrite = true } } ``` ### Running Command ```shell $SEATUNNEL_HOME/bin/seatunnel.sh --config $SEATUNNEL_HOME/config/v2.batch.config.mysql2maxcompute -e local ``` ### Error Exception ```log 2023-03-08 17:51:30,495 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel_default_cluster-696258] [5.1] taskDone, taskId = 40000, taskGroup = TaskGroupLocation{jobId=685786618971291649, pipelineId=1, taskGroupId=30000} 2023-03-08 17:51:30,694 INFO com.hazelcast.internal.diagnostics.HealthMonitor - [localhost]:5801 [seatunnel_default_cluster-696258] [5.1] processors=2, physical.memory.total=15.3G, physical.memory.free=614.3M, swap.space.total=0, swap.space.free=0, heap.memory.used=321.7M, heap.memory.free=764.8M, heap.memory.total=1.1G, heap.memory.max=3.4G, heap.memory.used/total=29.61%, heap.memory.used/max=9.27%, minor.gc.count=19, minor.gc.time=379ms, major.gc.count=3, major.gc.time=253ms, load.process=0.00%, load.system=90.18%, load.systemAverage=1.36, thread.count=186, thread.peakCount=189, cluster.timeDiff=0, event.q.size=0, executor.q.async.size=0, executor.q.client.size=0, executor.q.client.query.size=0, executor.q.client.blocking.size=0, executor.q.query.size=0, executor.q.scheduled.size=0, executor.q.io.size=0, executor.q.system.size=0, executor.q.operations.size=0, executor.q.priorityOperation.size=0, operations.completed.count=199, executor.q.mapLoad.size=0, executor.q.mapLoadAllKey s.size=0, executor.q.cluster.size=0, executor.q.response.size=0, operations.running.count=0, operations.pending.invocations.percentage=0.00%, operations.pending.invocations.count=0, proxy.count=10, clientEndpoint.count=1, connection.active.count=0, client.connection.count=0, connection.count=0 2023-03-08 17:51:30,823 WARN org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel_default_cluster-696258] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@49f6d181 org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException: ErrorCode:[COMMON-11], ErrorDescription:[Sink writer operation failed, such as (open, close) etc...] - ErrorCode=Local Error, ErrorMessage=Block not exsits on server, block id is 201 at com.aliyun.odps.tunnel.TableTunnel$UploadSession.commit(TableTunnel.java:983) at org.apache.seatunnel.connectors.seatunnel.maxcompute.sink.MaxcomputeWriter.close(MaxcomputeWriter.java:85) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:133) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$4(SeaTunnelTask.java:316) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:313) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:173) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:538) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at org.apache.seatunnel.connectors.seatunnel.maxcompute.sink.MaxcomputeWriter.close(MaxcomputeWriter.java:87) ~[connector-maxcompute-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:133) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$4(SeaTunnelTask.java:316) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT] at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_60] at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) ~[?:1.8.0_60] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) ~[?:1.8.0_60] at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) ~[?:1.8.0_60] at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) ~[?:1.8.0_60] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_60] at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) ~[?:1.8.0_60] at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) ~[?:1.8.0_60] at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) ~[?:1.8.0_60] at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) ~[?:1.8.0_60] at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) ~[?:1.8.0_60] at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) ~[?:1.8.0_60] at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) ~[?:1.8.0_60] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:1.8.0_60] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) ~[?:1.8.0_60] at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) ~[?:1.8.0_60] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:313) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:173) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT] at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT] at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:538) [seatunnel-starter.jar:2.3.1-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_60] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_60] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_60] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_60] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] 2023-03-08 17:51:30,831 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel_default_cluster-696258] [5.1] taskDone, taskId = 50000, taskGroup = TaskGroupLocation{jobId=685786618971291649, pipelineId=1, taskGroupId=30000} 2023-03-08 17:51:30,831 INFO org.apache.seatunnel.engine.server.TaskExecutionService - [localhost]:5801 [seatunnel_default_cluster-696258] [5.1] Task TaskGroupLocation{jobId=685786618971291649, pipelineId=1, taskGroupId=30000} complete with state FAILED 2023-03-08 17:51:30,833 INFO org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job mysql_maxcompute_test (685786618971291649), Pipeline: [(1/1)], task: [Jdbc-SourceTask (1/1)] turn to end state FAILED. 2023-03-08 17:51:30,833 ERROR org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job mysql_maxcompute_test (685786618971291649), Pipeline: [(1/1)], task: [Jdbc-SourceTask (1/1)] end with state FAILED and Exception: org.apache.seatunnel.connectors.seatunnel.maxcompute.exception.MaxcomputeConnectorException: ErrorCode:[COMMON-11], ErrorDescription:[Sink writer operation failed, such as (open, close) etc...] - ErrorCode=Local Error, ErrorMessage=Block not exsits on server, block id is 201 at com.aliyun.odps.tunnel.TableTunnel$UploadSession.commit(TableTunnel.java:983) at org.apache.seatunnel.connectors.seatunnel.maxcompute.sink.MaxcomputeWriter.close(MaxcomputeWriter.java:85) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:133) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$4(SeaTunnelTask.java:316) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:313) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:173) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:538) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) at org.apache.seatunnel.connectors.seatunnel.maxcompute.sink.MaxcomputeWriter.close(MaxcomputeWriter.java:87) at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:133) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$4(SeaTunnelTask.java:316) at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:313) at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:173) at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:538) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2023-03-08 17:51:30,834 ERROR org.apache.seatunnel.engine.server.dag.physical.SubPlan - Task TaskGroupLocation{jobId=685786618971291649, pipelineId=1, taskGroupId=30000} Failed in Job mysql_maxcompute_test (685786618971291649), Pipeline: [(1/1)], Begin to cancel other tasks in this pipeline. 2023-03-08 17:51:30,836 INFO org.apache.seatunnel.engine.server.dag.physical.SubPlan - Job mysql_maxcompute_test (685786618971291649), Pipeline: [(1/1)] turn from state RUNNING to CANCELING. 2023-03-08 17:51:30,834 ERROR org.apache.seatunnel.engine.server.task.SeaTunnelTask - Close FlowLifeCycle error. java.io.IOException: Stream is closed at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.checkError(HttpURLConnection.java:3472) ~[?:1.8.0_60] at sun.net.www.protocol.http.HttpURLConnection$StreamingOutputStream.write(HttpURLConnection.java:3461) ~[?:1.8.0_60] at org.apache.commons.io.output.ProxyOutputStream.write(ProxyOutputStream.java:92) ~[connector-maxcompute-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT] at com.google.protobuf.CodedOutputStream.refreshBuffer(CodedOutputStream.java:803) ~[connector-maxcompute-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT] at com.google.protobuf.CodedOutputStream.flush(CodedOutputStream.java:813) ~[connector-maxcompute-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT] at com.aliyun.odps.commons.proto.ProtobufRecordStreamWriter.close(ProtobufRecordStreamWriter.java:375) ~[connector-maxcompute-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT] at com.aliyun.odps.tunnel.io.TunnelRecordWriter.close(TunnelRecordWriter.java:85) ~[connector-maxcompute-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT] at org.apache.seatunnel.connectors.seatunnel.maxcompute.sink.MaxcomputeWriter.close(MaxcomputeWriter.java:83) ~[connector-maxcompute-2.3.1-SNAPSHOT.jar:2.3.1-SNAPSHOT] at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.close(SinkFlowLifeCycle.java:133) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.lambda$close$4(SeaTunnelTask.java:316) ~[seatunnel-starter.jar:2.3.1-SNAPSHOT] at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) [?:1.8.0_60] at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374) [?:1.8.0_60] at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) [?:1.8.0_60] at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291) [?:1.8.0_60] at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731) [?:1.8.0_60] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_60] at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870) [?:1.8.0_60] at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467) [?:1.8.0_60] at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324) [?:1.8.0_60] at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405) [?:1.8.0_60] at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734) [?:1.8.0_60] at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160) [?:1.8.0_60] at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174) [?:1.8.0_60] at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) [?:1.8.0_60] at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418) [?:1.8.0_60] at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583) [?:1.8.0_60] at org.apache.seatunnel.engine.server.task.SeaTunnelTask.close(SeaTunnelTask.java:313) [seatunnel-starter.jar:2.3.1-SNAPSHOT] at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:554) [seatunnel-starter.jar:2.3.1-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_60] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_60] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_60] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_60] at java.lang.Thread.run(Thread.java:745) [?:1.8.0_60] ``` ### Flink or Spark Version with zeta engine ### Java or Scala Version java 8 ### Screenshots commit with not exists block id  i read the source code, there code may cause the problem; should sink commit block and record the block ids ? , i am not sure ~~~ https://github.com/apache/incubator-seatunnel/blob/dev/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/sink/MaxcomputeWriter.java  ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
