JNSimba commented on code in PR #193:
URL:
https://github.com/apache/doris-flink-connector/pull/193#discussion_r1325590958
##########
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java:
##########
@@ -182,23 +185,38 @@ private void checkDone() {
return;
}
- // double check to interrupt when loading is true and
dorisStreamLoad.getPendingLoadFuture().isDone
- // fix issue #139
+ // double-check the future, to avoid getting the old future
if (dorisStreamLoad.getPendingLoadFuture() != null
&& dorisStreamLoad.getPendingLoadFuture().isDone()) {
- // TODO: introduce cache for reload instead of throwing
exceptions.
- String errorMsg;
- try {
- RespContent content =
dorisStreamLoad.handlePreCommitResponse(dorisStreamLoad.getPendingLoadFuture().get());
- errorMsg = content.getMessage();
- } catch (Exception e) {
- errorMsg = e.getMessage();
+ // error happened when loading, now we should stop receive data
+ // and abort previous txn(stream load) and start a new
txn(stream load)
+ // use send cached data to new txn, then notify to restart the
stream
+ if (executionOptions.isUseCache()) {
+ try {
+
this.dorisStreamLoad.setHostPort(RestService.getBackend(dorisOptions,
dorisReadOptions, LOG));
Review Comment:
maybe can call backendUtil.getAvailableBackend?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]