john8628 opened a new issue, #3712: URL: https://github.com/apache/incubator-seatunnel/issues/3712
### Search before asking - [X] I had searched in the [issues](https://github.com/apache/incubator-seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened use seatunnel(latest dev) load data from s3 file to clickhouse cluster error ; only one shard have the data; clickhouse clusters shard : 3 replica: 2 shard 1: ck1,ck1_1 shard 2: ck2,ck2_2 shard 3: ck3,ck3_3 distribute create table ddl: CREATE TABLE test.cycle_base_label_string_dist ( `code` String COMMENT 'xxxx', `value` String COMMENT 'xxx', `dt` Date, `id` UInt32 ) ENGINE = Distributed('dmppre_ck', 'test', 'cycle_base_label_string_local', rand() local table create table ddl : CREATE TABLE test.cycle_base_label_string_local ( `code` String COMMENT 'xxx', `value` String COMMENT 'xxx', `dt` Date, `id` UInt32 ) ENGINE = ReplicatedMergeTree('/clickhouse/tables/{shard}/test/cycle_base_label_string_local', '{replica}') PARTITION BY dt ORDER BY code TTL dt + toIntervalDay(4) SETTINGS storage_policy = 'multi_disks', index_granularity = 8192; ### SeaTunnel Version dev ### SeaTunnel Config ```conf env { # You can set spark configuration here execution.parallelism = 8 job.mode = "BATCH" } source { S3File { path = "/xxx/audc_user_label_bigint_inner/dt=20221209/label_name=show_cnt" esult_table_name = "fake" bucket = "s3://xxx" type = "parquet" } } transform{ sql { sql = "select label_name as code,id,'2022-12-12' as dt,label_value as value from fake " } } sink{ Clickhouse { host = "ck1:8123,ck2:8123,ck3:8123" database = "test" table = "cycle_base_label_string_dist" bulk_size=200000 split_mode = true sharding_key = "id" fields = [ "code", "value", "dt", "id" ] username = "default" password = "" } } ``` ### Running Command ```shell nohup bash bin/start-seatunnel-spark-connector-v2.sh --master yarn --deploy-mode client --config ./config/s32clickhouseBatch.template ``` ### Error Exception ```log 22/12/13 03:06:23 ERROR WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.SparkDataSourceWriter@2dc72c0c aborted. 22/12/13 03:06:23 ERROR SparkApiTaskExecuteCommand: Run SeaTunnel on spark failed. org.apache.spark.SparkException: Writing job aborted. at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:173) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:169) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:197) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:194) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:114) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:112) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677) at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94) at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141) at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:261) at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:85) at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:59) at org.apache.seatunnel.core.starter.spark.command.SparkApiTaskExecuteCommand.execute(SparkApiTaskExecuteCommand.java:55) at org.apache.seatunnel.core.starter.Seatunnel.run(Seatunnel.java:39) at org.apache.seatunnel.core.starter.spark.SeatunnelSpark.main(SeatunnelSpark.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.6 in stage 0.0 (TID 6, ip-10-20-48-185.eu-west-1.compute.internal, executor 67): ExecutorLostFailure (executor 67 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 155866 ms Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64) ... 36 more 22/12/13 03:06:23 ERROR Seatunnel: =============================================================================== 22/12/13 03:06:23 ERROR Seatunnel: Fatal Error, 22/12/13 03:06:23 ERROR Seatunnel: Please submit bug report in https://github.com/apache/incubator-seatunnel/issues 22/12/13 03:06:23 ERROR Seatunnel: Reason:Writing job aborted. 22/12/13 03:06:23 ERROR Seatunnel: Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: Writing job aborted. at org.apache.seatunnel.core.starter.spark.command.SparkApiTaskExecuteCommand.execute(SparkApiTaskExecuteCommand.java:58) at org.apache.seatunnel.core.starter.Seatunnel.run(Seatunnel.java:39) at org.apache.seatunnel.core.starter.spark.SeatunnelSpark.main(SeatunnelSpark.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 22/12/13 03:06:23 ERROR Seatunnel: =============================================================================== Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: Writing job aborted. at org.apache.seatunnel.core.starter.spark.command.SparkApiTaskExecuteCommand.execute(SparkApiTaskExecuteCommand.java:58) at org.apache.seatunnel.core.starter.Seatunnel.run(Seatunnel.java:39) at org.apache.seatunnel.core.starter.spark.SeatunnelSpark.main(SeatunnelSpark.java:34) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:853) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:937) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 22/12/13 03:06:23 INFO SparkContext: Invoking stop() from shutdown hook ``` ### Flink or Spark Version Spark version 2.4.6 ### Java or Scala Version _No response_ ### Screenshots  ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
