Larborator opened a new issue, #8990:
URL: https://github.com/apache/seatunnel/issues/8990

   ### Search before asking
   
   - [x] I had searched in the 
[issues](https://github.com/apache/seatunnel/issues?q=is%3Aissue+label%3A%22bug%22)
 and found no similar issues.
   
   
   ### What happened
   
   the problem seems like https://github.com/apache/seatunnel/issues/6854,bug I 
can't find solution.
   
   version is 2.3.5 and has merged https://github.com/apache/seatunnel/pull/6688
   
   When I set sink.enable-2pc=false, job is OK.
   
   But when I set sink.enable-2pc=true, job will failed and exception is:
   ```
   25/03/17 16:27:46 INFO InternalParquetRecordReader: Assembled and processed 
13847816 records from 13 columns in 77705 ms: 178.2101 rec/ms, 2316.7312 cell/ms
   25/03/17 16:27:46 INFO InternalParquetRecordReader: time spent so far 4% 
reading (3902 ms) and 95% processing (77705 ms)
   25/03/17 16:27:46 INFO InternalParquetRecordReader: at row 13847816. reading 
next block
   25/03/17 16:27:46 INFO InternalParquetRecordReader: block read in memory in 
455 ms. row count = 4088733
   25/03/17 16:28:06 INFO RetryExec: I/O exception (java.net.SocketException) 
caught when processing request to 
{}->http://dataarch-bjy-d5-154.idchb1az2.hb1.kwaidc.com:8040: Broken pipe 
(Write failed)
   25/03/17 16:28:09 ERROR DorisSinkWriter: stream load finished unexpectedly, 
interrupt worker thread! org.apache.http.client.ClientProtocolException
   25/03/17 16:28:09 ERROR Utils: Aborting task
   java.lang.RuntimeException: java.lang.InterruptedException
        at 
org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:57)
        at 
org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:194)
        at 
org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:135)
        at 
org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:52)
        at 
org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:66)
        at 
org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:41)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1422)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
        at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
        at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:95)
        at org.apache.spark.scheduler.Task.run(Task.scala:124)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:495)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1388)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.InterruptedException
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at 
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
        at 
org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.write(RecordBuffer.java:93)
        at 
org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:55)
        ... 19 more
   25/03/17 16:28:09 ERROR DataWritingSparkTask: Aborting commit for partition 
4 (task 4, attempt 0stage 0.0)
   25/03/17 16:28:09 INFO DorisStreamLoad: abort for labelSuffix 
s_distribute_shelf_zsm_2da1507d273144e384e7bdbe1c35aad6_4. start chkId 1.
   ```
   
   ### SeaTunnel Version
   
    2.3.5 and has merged https://github.com/apache/seatunnel/pull/6688
   
   ### SeaTunnel Config
   
   ```conf
   env {
     parallelism = ${TASK_NUM}
     job.mode = "BATCH"
     spark.app.name = "hive2bleem-template"
   }
   
   source {
   
     Hive {
       table_name = ${HIVE_TBL}
       metastore_uri = ${HIVE_URL}
       read_partitions = ${PARTS}
       read_columns = ${COLS}
     }
   }
   
   sink {
   
     Doris {
       table.identifier = ${BLEEM_TBL}
       fenodes = ${BLEEM_FE}
       username="root"
       password=""
       sink.label-prefix="s"
       sink.enable-2pc=${enable-2pc}
       doris.batch.size=200000
       doris.config {
         format = "json"
         read_json_by_line = "true"
       }
     }
   }
   ```
   
   ### Running Command
   
   ```shell
   start spark job command
   ```
   
   ### Error Exception
   
   ```log
   25/03/17 16:27:46 INFO InternalParquetRecordReader: Assembled and processed 
13847816 records from 13 columns in 77705 ms: 178.2101 rec/ms, 2316.7312 cell/ms
   25/03/17 16:27:46 INFO InternalParquetRecordReader: time spent so far 4% 
reading (3902 ms) and 95% processing (77705 ms)
   25/03/17 16:27:46 INFO InternalParquetRecordReader: at row 13847816. reading 
next block
   25/03/17 16:27:46 INFO InternalParquetRecordReader: block read in memory in 
455 ms. row count = 4088733
   25/03/17 16:28:06 INFO RetryExec: I/O exception (java.net.SocketException) 
caught when processing request to 
{}->http://dataarch-bjy-d5-154.idchb1az2.hb1.kwaidc.com:8040: Broken pipe 
(Write failed)
   25/03/17 16:28:09 ERROR DorisSinkWriter: stream load finished unexpectedly, 
interrupt worker thread! org.apache.http.client.ClientProtocolException
   25/03/17 16:28:09 ERROR Utils: Aborting task
   java.lang.RuntimeException: java.lang.InterruptedException
        at 
org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:57)
        at 
org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.writeRecord(DorisStreamLoad.java:194)
        at 
org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:135)
        at 
org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.write(DorisSinkWriter.java:52)
        at 
org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:66)
        at 
org.apache.seatunnel.translation.spark.sink.writer.SparkDataWriter.write(SparkDataWriter.java:41)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1422)
        at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
        at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
        at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:95)
        at org.apache.spark.scheduler.Task.run(Task.scala:124)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:495)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1388)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.lang.InterruptedException
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
        at 
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
        at 
org.apache.seatunnel.connectors.doris.sink.writer.RecordBuffer.write(RecordBuffer.java:93)
        at 
org.apache.seatunnel.connectors.doris.sink.writer.RecordStream.write(RecordStream.java:55)
        ... 19 more
   25/03/17 16:28:09 ERROR DataWritingSparkTask: Aborting commit for partition 
4 (task 4, attempt 0stage 0.0)
   25/03/17 16:28:09 INFO DorisStreamLoad: abort for labelSuffix 
s_distribute_shelf_zsm_2da1507d273144e384e7bdbe1c35aad6_4. start chkId 1.
   ```
   
   ### Zeta or Flink or Spark Version
   
   _No response_
   
   ### Java or Scala Version
   
   _No response_
   
   ### Screenshots
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to