imay closed pull request #354: Avoid 'No more data to read' error when handling stream load RPC URL: https://github.com/apache/incubator-doris/pull/354
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/fe/src/main/java/org/apache/doris/common/UserException.java b/fe/src/main/java/org/apache/doris/common/UserException.java index 7f04a08f..7add338f 100644 --- a/fe/src/main/java/org/apache/doris/common/UserException.java +++ b/fe/src/main/java/org/apache/doris/common/UserException.java @@ -17,24 +17,26 @@ package org.apache.doris.common; +import com.google.common.base.Strings; + /** * Thrown for internal server errors. */ public class UserException extends Exception { public UserException(String msg, Throwable cause) { - super(msg, cause); + super(Strings.nullToEmpty(msg), cause); } public UserException(Throwable cause) { super(cause); } - public UserException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); + public UserException(String msg, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(Strings.nullToEmpty(msg), cause, enableSuppression, writableStackTrace); } public UserException(String msg) { - super(msg); + super(Strings.nullToEmpty(msg)); } } diff --git a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 9c6d2668..72ca0f9c 100644 --- a/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -396,11 +396,11 @@ public TFeResult miniLoad(TMiniLoadRequest request) throws TException { } catch (UserException e) { LOG.warn("add mini load error", e); status.setStatus_code(TStatusCode.ANALYSIS_ERROR); - status.setError_msgs(Lists.newArrayList(e.getMessage())); + status.addToError_msgs(e.getMessage()); } catch (Throwable e) { LOG.warn("unexpected exception when adding mini load", e); status.setStatus_code(TStatusCode.ANALYSIS_ERROR); - status.setError_msgs(Lists.newArrayList(e.getMessage())); + status.addToError_msgs(Strings.nullToEmpty(e.getMessage())); } finally { ConnectContext.remove(); } @@ -465,7 +465,7 @@ public TFeResult updateMiniEtlTaskStatus(TUpdateMiniEtlTaskStatusRequest request String failMsg = "job does not exist. id: " + jobId; LOG.warn(failMsg); status.setStatus_code(TStatusCode.CANCELLED); - status.setError_msgs(Lists.newArrayList(failMsg)); + status.addToError_msgs(failMsg); return result; } @@ -474,7 +474,7 @@ public TFeResult updateMiniEtlTaskStatus(TUpdateMiniEtlTaskStatusRequest request String failMsg = "task info does not exist. task id: " + taskId + ", job id: " + jobId; LOG.warn(failMsg); status.setStatus_code(TStatusCode.CANCELLED); - status.setError_msgs(Lists.newArrayList(failMsg)); + status.addToError_msgs(failMsg); return result; } @@ -554,12 +554,12 @@ public TFeResult loadCheck(TLoadCheckRequest request) throws TException { request.getTbl(), request.getUser_ip(), PrivPredicate.LOAD); } catch (UserException e) { status.setStatus_code(TStatusCode.ANALYSIS_ERROR); - status.setError_msgs(Lists.newArrayList(e.getMessage())); + status.addToError_msgs(e.getMessage()); return result; } catch (Throwable e) { LOG.warn("catch unknown result.", e); status.setStatus_code(TStatusCode.INTERNAL_ERROR); - status.setError_msgs(Lists.newArrayList(e.getMessage())); + status.addToError_msgs(Strings.nullToEmpty(e.getMessage())); return result; } @@ -576,11 +576,17 @@ public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TEx result.setTxnId(loadTxnBeginImpl(request)); } catch (LabelAlreadyExistsException e) { status.setStatus_code(TStatusCode.LABEL_ALREADY_EXISTS); - status.setError_msgs(Lists.newArrayList(e.getMessage())); + status.addToError_msgs(e.getMessage()); } catch (UserException e) { status.setStatus_code(TStatusCode.ANALYSIS_ERROR); - status.setError_msgs(Lists.newArrayList(e.getMessage())); + status.addToError_msgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatus_code(TStatusCode.INTERNAL_ERROR); + status.addToError_msgs(Strings.nullToEmpty(e.getMessage())); + return result; } + return result; } @@ -624,12 +630,16 @@ public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws if (!loadTxnCommitImpl(request)) { // committed success but not visible status.setStatus_code(TStatusCode.PUBLISH_TIMEOUT); - status.setError_msgs( - Lists.newArrayList("transaction commit successfully, BUT data will be visible later")); + status.addToError_msgs("transaction commit successfully, BUT data will be visible later"); } } catch (UserException e) { status.setStatus_code(TStatusCode.ANALYSIS_ERROR); status.addToError_msgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatus_code(TStatusCode.INTERNAL_ERROR); + status.addToError_msgs(Strings.nullToEmpty(e.getMessage())); + return result; } return result; } @@ -674,6 +684,11 @@ public TLoadTxnRollbackResult loadTxnRollback(TLoadTxnRollbackRequest request) t } catch (UserException e) { status.setStatus_code(TStatusCode.ANALYSIS_ERROR); status.addToError_msgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatus_code(TStatusCode.INTERNAL_ERROR); + status.addToError_msgs(Strings.nullToEmpty(e.getMessage())); + return result; } return result; @@ -704,6 +719,11 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) throws } catch (UserException e) { status.setStatus_code(TStatusCode.ANALYSIS_ERROR); status.addToError_msgs(e.getMessage()); + } catch (Throwable e) { + LOG.warn("catch unknown result.", e); + status.setStatus_code(TStatusCode.INTERNAL_ERROR); + status.addToError_msgs(Strings.nullToEmpty(e.getMessage())); + return result; } return result; } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org For additional commands, e-mail: dev-h...@doris.apache.org