mymeiyi commented on code in PR #63594:
URL: https://github.com/apache/doris/pull/63594#discussion_r3296908189
##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -2779,13 +2784,86 @@ private void recordFinishedLoadJobRequestImpl(String
label, long txnId, String d
EtlJobType.INSERT, createTime, failMsg, trackingUrl,
firstErrorMsg, userIdentity, -1);
}
+ private static int nextGroupCommitFollowerIndex(int followerCount) {
+ return Math.floorMod(GROUP_COMMIT_FOLLOWER_INDEX.getAndIncrement(),
followerCount);
+ }
+
+ private TStreamLoadPutResult
forwardGroupCommitStreamLoad(TStreamLoadPutRequest request) {
+ HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
+ List<Frontend> followers =
Env.getCurrentEnv().getFrontends(FrontendNodeType.FOLLOWER).stream()
+ .filter(fe -> fe.isAlive() &&
!(fe.getHost().equals(selfNode.getHost())
+ && fe.getEditLogPort() == selfNode.getPort())).collect(
+ Collectors.toList());
+ if (CollectionUtils.isEmpty(followers)) {
+ return null;
+ }
+
+ // check table enable light_schema_change and group commit does not
block for schema change
+ TStreamLoadPutResult result = new TStreamLoadPutResult();
+ TStatus status = new TStatus(TStatusCode.OK);
+ result.setStatus(status);
+ try {
+ Database db =
Env.getCurrentInternalCatalog().getDbOrDdlException(request.getDb());
+ OlapTable table = (OlapTable)
db.getTableOrDdlException(request.getTbl());
+ if (!table.getTableProperty().getUseSchemaLightChange()) {
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(
+ "table light_schema_change is false, can't do stream
load with group commit mode");
+ return result;
+ }
+ if
(Env.getCurrentEnv().getGroupCommitManager().isBlock(table.getId())) {
+ String msg = "insert table " + table.getId() +
GroupCommitPlanner.SCHEMA_CHANGE;
+ LOG.info(msg);
+ status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
+ status.addToErrorMsgs(msg);
+ return result;
+ }
+ } catch (Exception e) {
+ LOG.warn("failed to pre-check group commit stream load, fallback
to local. db={}, tbl={}",
+ request.getDb(), request.getTbl(), e);
+ return null;
+ }
+
+ int idx = nextGroupCommitFollowerIndex(followers.size());
+ Frontend follower = followers.get(idx);
+ TNetworkAddress address = new TNetworkAddress(follower.getHost(),
follower.getRpcPort());
+ LOG.info("forward group commit stream load put to follower {}, db={},
tbl={}, groupCommitMode={}",
+ address, request.getDb(), request.getTbl(),
request.getGroupCommitMode());
+ FrontendService.Client client = null;
+ boolean ok = false;
+ try {
+ client = ClientPool.frontendPool.borrowObject(address);
+ TStreamLoadPutResult streamLoadPutResult =
client.streamLoadPut(request);
+ ok = true;
Review Comment:
StreamLoadHandler.generatePlan() does not call
GroupCommitManager.selectBackendForGroupCommit()
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]