JNSimba opened a new pull request, #578:
URL: https://github.com/apache/doris-flink-connector/pull/578
# Proposed changes
Currently, we rely on the check-interval thread to periodically check
whether the http thread of streamload is alive.
This will cause the following problems:
In extreme cases, if the data of a checkpoint is exactly equal to the size
of bufferCount*bufferSize, the flink program will be stuck.
jstack is as follows. At this time, the checkdone thread detects that it has
entered prepareCommit and will not check.
```java
"stream-load-upload-0-test_multi_failover_sink_tbl_multi_csv4-thread-1"
Id=121 WAITING on
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@4f4e02f1
at sun.misc.Unsafe.park(Native Method)
- waiting on
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@4f4e02f1
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Sink: Writer -> Sink: Committer (2/2)#0" Id=79 WAITING on
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@56ce1949
at sun.misc.Unsafe.park(Native Method)
- waiting on
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@56ce1949
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2044)
at
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:403)
at
org.apache.doris.flink.sink.writer.RecordBuffer.stopBufferData(RecordBuffer.java:79)
at
org.apache.doris.flink.sink.writer.RecordStream.endInput(RecordStream.java:49)
at
org.apache.doris.flink.sink.writer.DorisStreamLoad.stopLoad(DorisStreamLoad.java:296)
at
org.apache.doris.flink.sink.writer.DorisWriter.prepareCommit(DorisWriter.java:256)
```
Modification method:
When the http thread is initiated, when an error occurs, explicitly report
the error. Immediately interrupt the thread to let the error be thrown
## Checklist(Required)
1. Does it affect the original behavior: (Yes/No/I Don't know)
2. Has unit tests been added: (Yes/No/No Need)
3. Has document been added or modified: (Yes/No/No Need)
4. Does it need to update dependencies: (Yes/No)
5. Are there any changes that cannot be rolled back: (Yes/No)
## Further comments
If this is a relatively large or complex change, kick off the discussion at
[[email protected]](mailto:[email protected]) by explaining why you
chose the solution you did and what alternatives you considered, etc...
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]