Larborator opened a new issue, #8990: URL: https://github.com/apache/seatunnel/issues/8990
### Search before asking - [x] I had searched in the [issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22) and found no similar issues. ### What happened the problem seems like https://github.com/apache/seatunnel/issues/6854,bug I can't find solution. version is 2.3.5 and has merged https://github.com/apache/seatunnel/pull/6688 When I set sink.enable-2pc=false, job is OK. But when I set sink.enable-2pc=true, job will failed and exception is: ``` 25/03/17 16:27:46 INFO InternalParquetRecordReader: Assembled and processed 13847816 records from 13 columns in 77705 ms: 178.2101 rec/ms, 2316.7312 cell/ms 25/03/17 16:27:46 INFO InternalParquetRecordReader: time spent so far 4% reading (3902 ms) and 95% processing (77705 ms) 25/03/17 16:27:46 INFO InternalParquetRecordReader: at row 13847816. reading next block 25/03/17 16:27:46 INFO InternalParquetRecordReader: block read in memory in 455 ms. row count = 4088733 25/03/17 16:28:06 INFO RetryExec: I/O exception (java.net.SocketException) caught when processing request to {}->http://dataarch-bjy-d5-154.idchb1az2.hb1.kwaidc.com:8040: Broken pipe (Write failed) 25/03/17 16:28:09 ERROR DorisSinkWriter: stream load finished unexpectedly, interrupt worker thread! org.apache.http.client.ClientProtocolException 25/03/17 16:28:09 ERROR Utils: Aborting task java.lang.RuntimeException: java.lang.InterruptedException at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:57) at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:194) at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:135) at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:52) at org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:66) at org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:41) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1422) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:95) at org.apache.spark.scheduler.Task.run(Task.scala:124) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:495) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1388) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403) at org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.write(RecordBuffer.java:93) at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:55) ... 19 more 25/03/17 16:28:09 ERROR DataWritingSparkTask: Aborting commit for partition 4 (task 4, attempt 0stage 0.0) 25/03/17 16:28:09 INFO DorisStreamLoad: abort for labelSuffix s_distribute_shelf_zsm_2da1507d273144e384e7bdbe1c35aad6_4. start chkId 1. ``` ### SeaTunnel Version 2.3.5 and has merged https://github.com/apache/seatunnel/pull/6688 ### SeaTunnel Config ```conf env { parallelism = ${TASK_NUM} job.mode = "BATCH" spark.app.name = "hive2bleem-template" } source { Hive { table_name = ${HIVE_TBL} metastore_uri = ${HIVE_URL} read_partitions = ${PARTS} read_columns = ${COLS} } } sink { Doris { table.identifier = ${BLEEM_TBL} fenodes = ${BLEEM_FE} username="root" password="" sink.label-prefix="s" sink.enable-2pc=${enable-2pc} doris.batch.size=200000 doris.config { format = "json" read_json_by_line = "true" } } } ``` ### Running Command ```shell start spark job command ``` ### Error Exception ```log 25/03/17 16:27:46 INFO InternalParquetRecordReader: Assembled and processed 13847816 records from 13 columns in 77705 ms: 178.2101 rec/ms, 2316.7312 cell/ms 25/03/17 16:27:46 INFO InternalParquetRecordReader: time spent so far 4% reading (3902 ms) and 95% processing (77705 ms) 25/03/17 16:27:46 INFO InternalParquetRecordReader: at row 13847816. reading next block 25/03/17 16:27:46 INFO InternalParquetRecordReader: block read in memory in 455 ms. row count = 4088733 25/03/17 16:28:06 INFO RetryExec: I/O exception (java.net.SocketException) caught when processing request to {}->http://dataarch-bjy-d5-154.idchb1az2.hb1.kwaidc.com:8040: Broken pipe (Write failed) 25/03/17 16:28:09 ERROR DorisSinkWriter: stream load finished unexpectedly, interrupt worker thread! org.apache.http.client.ClientProtocolException 25/03/17 16:28:09 ERROR Utils: Aborting task java.lang.RuntimeException: java.lang.InterruptedException at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:57) at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:194) at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:135) at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:52) at org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:66) at org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:41) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1422) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:95) at org.apache.spark.scheduler.Task.run(Task.scala:124) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:495) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1388) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403) at org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.write(RecordBuffer.java:93) at org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:55) ... 19 more 25/03/17 16:28:09 ERROR DataWritingSparkTask: Aborting commit for partition 4 (task 4, attempt 0stage 0.0) 25/03/17 16:28:09 INFO DorisStreamLoad: abort for labelSuffix s_distribute_shelf_zsm_2da1507d273144e384e7bdbe1c35aad6_4. start chkId 1. ``` ### Zeta or Flink or Spark Version _No response_ ### Java or Scala Version _No response_ ### Screenshots _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [x] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
