JNSimba opened a new pull request, #578:
URL: https://github.com/apache/doris-flink-connector/pull/578

   # Proposed changes
   
   Currently, we rely on the check-interval thread to periodically check 
whether the http thread of streamload is alive.
   This will cause the following problems:
   In extreme cases, if the data of a checkpoint is exactly equal to the size 
of bufferCount*bufferSize, the flink program will be stuck.
   
   jstack is as follows. At this time, the checkdone thread detects that it has 
entered prepareCommit and will not check.
   
   ```java
   "stream-load-upload-0-test_multi_failover_sink_tbl_multi_csv4-thread-1" 
Id=121 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@4f4e02f1
        at sun.misc.Unsafe.park(Native Method)
        -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@4f4e02f1
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
        at 
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
        at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   Sink: Writer -> Sink: Committer (2/2)#0" Id=79 WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@56ce1949
        at sun.misc.Unsafe.park(Native Method)
        -  waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@56ce1949
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
        at 
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
        at 
org.apache.doris.flink.sink.writer.RecordBuffer.stopBufferData(RecordBuffer.java:79)
        at 
org.apache.doris.flink.sink.writer.RecordStream.endInput(RecordStream.java:49)
        at 
org.apache.doris.flink.sink.writer.DorisStreamLoad.stopLoad(DorisStreamLoad.java:296)
        at 
org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:256)
   ```
   
   Modification method:
   When the http thread is initiated, when an error occurs, explicitly report 
the error. Immediately interrupt the thread to let the error be thrown
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at 
[[email protected]](mailto:[email protected]) by explaining why you 
chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to