This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 8d0cd03d534 [Fix](group commit) Fix cloud group commit be select
strategy (#39986)
8d0cd03d534 is described below
commit 8d0cd03d534ae9a3fa4ec179af0d011bd2b0430c
Author: abmdocrt <[email protected]>
AuthorDate: Wed Aug 28 23:10:44 2024 +0800
[Fix](group commit) Fix cloud group commit be select strategy (#39986)
## Proposed changes
In #35558, we optimized be select for group commit. However, we forgot
to apply this strategy to cloud. This PR applys it.
<!--Describe your changes.-->
---
.../org/apache/doris/httpv2/rest/LoadAction.java | 140 +++++++++++++--------
.../org/apache/doris/load/StreamLoadHandler.java | 8 +-
.../apache/doris/load/loadv2/MysqlLoadManager.java | 2 +-
3 files changed, 92 insertions(+), 58 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
index c2d50460ea4..b0a714da149 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
@@ -85,7 +85,7 @@ public class LoadAction extends RestBaseController {
@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load",
method = RequestMethod.PUT)
public Object load(HttpServletRequest request, HttpServletResponse
response,
- @PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
+ @PathVariable(value = DB_KEY) String db, @PathVariable(value =
TABLE_KEY) String table) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
@@ -102,21 +102,29 @@ public class LoadAction extends RestBaseController {
@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY +
"}/_stream_load", method = RequestMethod.PUT)
public Object streamLoad(HttpServletRequest request,
- HttpServletResponse response,
- @PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
- LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table,
getAllHeaders(request));
+ HttpServletResponse response,
+ @PathVariable(value = DB_KEY) String db, @PathVariable(value =
TABLE_KEY) String table) {
+ LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table,
getAllHeaders(request));
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
- if (groupCommitStr != null &&
groupCommitStr.equalsIgnoreCase("async_mode")) {
- groupCommit = true;
- try {
- if (isGroupCommitBlock(db, table)) {
- String msg = "insert table " + table +
GroupCommitPlanner.SCHEMA_CHANGE;
- return new RestBaseResult(msg);
+ if (groupCommitStr != null) {
+ if (!groupCommitStr.equalsIgnoreCase("async_mode") &&
!groupCommitStr.equalsIgnoreCase("sync_mode")
+ && !groupCommitStr.equalsIgnoreCase("off_mode")) {
+ return new RestBaseResult("Header `group_commit` can only be
`sync_mode`, `async_mode` or `off_mode`.");
+ }
+ if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
+ groupCommit = true;
+ if (groupCommitStr.equalsIgnoreCase("async_mode")) {
+ try {
+ if (isGroupCommitBlock(db, table)) {
+ String msg = "insert table " + table +
GroupCommitPlanner.SCHEMA_CHANGE;
+ return new RestBaseResult(msg);
+ }
+ } catch (Exception e) {
+ LOG.info("exception:" + e);
+ return new RestBaseResult(e.getMessage());
+ }
}
- } catch (Exception e) {
- LOG.info("exception:" + e);
- return new RestBaseResult(e.getMessage());
}
}
if (needRedirect(request.getScheme())) {
@@ -147,21 +155,32 @@ public class LoadAction extends RestBaseController {
boolean groupCommit = false;
long tableId = -1;
String groupCommitStr = request.getHeader("group_commit");
- if (groupCommitStr != null &&
groupCommitStr.equalsIgnoreCase("async_mode")) {
- groupCommit = true;
- try {
- String[] pair = parseDbAndTb(sql);
- Database db = Env.getCurrentInternalCatalog()
- .getDbOrException(pair[0], s -> new
TException("database is invalid for dbName: " + s));
- Table tbl = db.getTableOrException(pair[1], s -> new
TException("table is invalid: " + s));
- tableId = tbl.getId();
- if (isGroupCommitBlock(pair[0], pair[1])) {
- String msg = "insert table " + pair[1] +
GroupCommitPlanner.SCHEMA_CHANGE;
- return new RestBaseResult(msg);
+ if (groupCommitStr != null) {
+ if (!groupCommitStr.equalsIgnoreCase("async_mode") &&
!groupCommitStr.equalsIgnoreCase("sync_mode")
+ && !groupCommitStr.equalsIgnoreCase("off_mode")) {
+ return new RestBaseResult("Header `group_commit` can only be
`sync_mode`, `async_mode` or `off_mode`.");
+ }
+ if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
+ try {
+ groupCommit = true;
+ String[] pair = parseDbAndTb(sql);
+ Database db = Env.getCurrentInternalCatalog()
+ .getDbOrException(pair[0], s -> new
TException("database is invalid for dbName: " + s));
+ Table tbl = db.getTableOrException(pair[1], s -> new
TException("table is invalid: " + s));
+ tableId = tbl.getId();
+
+ // async mode needs to write WAL, we need to block load
during waiting WAL.
+ if (groupCommitStr.equalsIgnoreCase("async_mode")) {
+ if (isGroupCommitBlock(pair[0], pair[1])) {
+ String msg = "insert table " + pair[1] +
GroupCommitPlanner.SCHEMA_CHANGE;
+ return new RestBaseResult(msg);
+ }
+
+ }
+ } catch (Exception e) {
+ LOG.info("exception:" + e);
+ return new RestBaseResult(e.getMessage());
}
- } catch (Exception e) {
- LOG.info("exception:" + e);
- return new RestBaseResult(e.getMessage());
}
}
executeCheckPassword(request, response);
@@ -223,8 +242,8 @@ public class LoadAction extends RestBaseController {
@RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method =
RequestMethod.PUT)
public Object streamLoad2PC(HttpServletRequest request,
- HttpServletResponse response,
- @PathVariable(value = DB_KEY) String db) {
+ HttpServletResponse response,
+ @PathVariable(value = DB_KEY) String db) {
LOG.info("streamload action 2PC, db: {}, headers: {}", db,
getAllHeaders(request));
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
@@ -236,9 +255,9 @@ public class LoadAction extends RestBaseController {
@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY +
"}/_stream_load_2pc", method = RequestMethod.PUT)
public Object streamLoad2PC_table(HttpServletRequest request,
- HttpServletResponse response,
- @PathVariable(value = DB_KEY) String db,
- @PathVariable(value = TABLE_KEY) String
table) {
+ HttpServletResponse response,
+ @PathVariable(value = DB_KEY) String db,
+ @PathVariable(value = TABLE_KEY) String table) {
LOG.info("streamload action 2PC, db: {}, tbl: {}, headers: {}", db,
table, getAllHeaders(request));
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
@@ -382,7 +401,7 @@ public class LoadAction extends RestBaseController {
if (Strings.isNullOrEmpty(cloudClusterName)) {
throw new LoadException("No cloud cluster name selected.");
}
- return selectCloudRedirectBackend(cloudClusterName, request,
groupCommit);
+ return selectCloudRedirectBackend(cloudClusterName, request,
groupCommit, tableId);
} else {
return selectLocalRedirectBackend(groupCommit, request, tableId);
}
@@ -409,21 +428,7 @@ public class LoadAction extends RestBaseController {
throw new
LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " +
policy);
}
if (groupCommit) {
- ConnectContext ctx = new ConnectContext();
- ctx.setEnv(Env.getCurrentEnv());
- ctx.setThreadLocalInfo();
- ctx.setRemoteIP(request.getRemoteAddr());
- // We set this variable to fulfill required field 'user' in
- // TMasterOpRequest(FrontendService.thrift)
- ctx.setQualifiedUser(Auth.ADMIN_USER);
- ctx.setThreadLocalInfo();
-
- try {
- backend = Env.getCurrentEnv().getGroupCommitManager()
- .selectBackendForGroupCommit(tableId, ctx, false);
- } catch (DdlException e) {
- throw new RuntimeException(e);
- }
+ backend = selectBackendForGroupCommit("", request, tableId, false);
} else {
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
}
@@ -433,9 +438,15 @@ public class LoadAction extends RestBaseController {
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}
- private TNetworkAddress selectCloudRedirectBackend(String clusterName,
HttpServletRequest req, boolean groupCommit)
+ private TNetworkAddress selectCloudRedirectBackend(String clusterName,
HttpServletRequest req, boolean groupCommit,
+ long tableId)
throws LoadException {
- Backend backend = StreamLoadHandler.selectBackend(clusterName,
groupCommit);
+ Backend backend = null;
+ if (groupCommit) {
+ backend = selectBackendForGroupCommit(clusterName, req, tableId,
true);
+ } else {
+ backend = StreamLoadHandler.selectBackend(clusterName);
+ }
String redirectPolicy =
req.getHeader(LoadAction.HEADER_REDIRECT_POLICY);
// User specified redirect policy
@@ -443,7 +454,7 @@ public class LoadAction extends RestBaseController {
return new TNetworkAddress(backend.getHost(),
backend.getHttpPort());
}
redirectPolicy = redirectPolicy == null || redirectPolicy.isEmpty()
- ? Config.streamload_redirect_policy : redirectPolicy;
+ ? Config.streamload_redirect_policy : redirectPolicy;
Pair<String, Integer> publicHostPort = null;
Pair<String, Integer> privateHostPort = null;
@@ -563,7 +574,7 @@ public class LoadAction extends RestBaseController {
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
private Object executeWithClusterToken(HttpServletRequest request, String
db,
- String table, boolean isStreamLoad) {
+ String table, boolean isStreamLoad) {
try {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
@@ -647,4 +658,29 @@ public class LoadAction extends RestBaseController {
}
return headers.toString();
}
+
+ private Backend selectBackendForGroupCommit(String clusterName,
HttpServletRequest req, long tableId,
+ boolean isCloud)
+ throws LoadException {
+ ConnectContext ctx = new ConnectContext();
+ ctx.setEnv(Env.getCurrentEnv());
+ ctx.setThreadLocalInfo();
+ ctx.setRemoteIP(req.getRemoteAddr());
+ // We set this variable to fulfill required field 'user' in
+ // TMasterOpRequest(FrontendService.thrift)
+ ctx.setQualifiedUser(Auth.ADMIN_USER);
+ ctx.setThreadLocalInfo();
+ if (isCloud) {
+ ctx.setCloudCluster(clusterName);
+ }
+
+ Backend backend = null;
+ try {
+ backend = Env.getCurrentEnv().getGroupCommitManager()
+ .selectBackendForGroupCommit(tableId, ctx, isCloud);
+ } catch (DdlException e) {
+ throw new LoadException(e.getMessage(), e);
+ }
+ return backend;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
index 0f44ec3f785..9e51e2cffd0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/StreamLoadHandler.java
@@ -84,13 +84,12 @@ public class StreamLoadHandler {
* Select a random backend in the given cloud cluster.
*
* @param clusterName cloud cluster name
- * @param groupCommit if this selection is for group commit
* @throws LoadException if there is no available backend
*/
- public static Backend selectBackend(String clusterName, boolean
groupCommit) throws LoadException {
+ public static Backend selectBackend(String clusterName) throws
LoadException {
List<Backend> backends = ((CloudSystemInfoService)
Env.getCurrentSystemInfo())
.getBackendsByClusterName(clusterName)
- .stream().filter(be -> be.isAlive() && (!groupCommit ||
groupCommit && !be.isDecommissioned()))
+ .stream().filter(Backend::isAlive)
.collect(Collectors.toList());
if (backends.isEmpty()) {
@@ -101,8 +100,7 @@ public class StreamLoadHandler {
// TODO: add a more sophisticated algorithm to select backend
SecureRandom rand = new SecureRandom();
int randomIndex = rand.nextInt(backends.size());
- Backend backend = backends.get(randomIndex);
- return backend;
+ return backends.get(randomIndex);
}
public void setCloudCluster() throws UserException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
index 7b156aeff8f..975b1ca1175 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java
@@ -447,7 +447,7 @@ public class MysqlLoadManager {
private String selectBackendForMySqlLoad(String database, String table)
throws LoadException {
Backend backend = null;
if (Config.isCloudMode()) {
- backend =
StreamLoadHandler.selectBackend(ConnectContext.get().getCloudCluster(), false);
+ backend =
StreamLoadHandler.selectBackend(ConnectContext.get().getCloudCluster());
} else {
BeSelectionPolicy policy = new
BeSelectionPolicy.Builder().needLoadAvailable().build();
List<Long> backendIds =
Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]