This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7be5373ed2b0fc6b60119f85d9248e56de8a8727 Author: Siyang Tang <[email protected]> AuthorDate: Thu Jul 6 10:25:38 2023 +0800 [enhancement](query) enable strong consistency by syncing max journal id from master (#21205) Add a session var & config enable_strong_consistency_read to solve the problem that loading result may be shortly invisible to follwers, to meet users requirements in strong consistency read scenario. Will sync max journal id from master and wait for replaying. --- docs/en/docs/advanced/variables.md | 4 + docs/zh-CN/docs/advanced/variables.md | 4 + .../java/org/apache/doris/qe/MasterOpExecutor.java | 107 ++++++++++++++------- .../java/org/apache/doris/qe/SessionVariable.java | 16 +++ .../java/org/apache/doris/qe/StmtExecutor.java | 9 ++ .../apache/doris/service/FrontendServiceImpl.java | 5 + gensrc/thrift/FrontendService.thrift | 1 + 7 files changed, 110 insertions(+), 36 deletions(-) diff --git a/docs/en/docs/advanced/variables.md b/docs/en/docs/advanced/variables.md index 682b390ae5..02d863c812 100644 --- a/docs/en/docs/advanced/variables.md +++ b/docs/en/docs/advanced/variables.md @@ -659,6 +659,10 @@ Translated with www.DeepL.com/Translator (free version) Controls whether to use lazy materialization technology in orc reader. The default value is true. +* `enable_strong_consistency_read` + + Used to enable strong consistent reading. By default, Doris supports strong consistency within the same session, that is, changes to data within the same session are visible in real time. If you want strong consistent reads between sessions, set this variable to true. + *** #### Supplementary instructions on statement execution timeout control diff --git a/docs/zh-CN/docs/advanced/variables.md b/docs/zh-CN/docs/advanced/variables.md index 0cbcb57280..c7bbe3c0c7 100644 --- a/docs/zh-CN/docs/advanced/variables.md +++ b/docs/zh-CN/docs/advanced/variables.md @@ -645,6 +645,10 @@ try (Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/ 控制 orc reader 是否启用延迟物化技术。默认为 true。 +* `enable_strong_consistency_read` + + 用以开启强一致读。Doris 默认支持同一个会话内的强一致性,即同一个会话内对数据的变更操作是实时可见的。如需要会话间的强一致读,则需将此变量设置为true。 + *** #### 关于语句执行超时控制的补充说明 diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java index 9bbbd598d6..ba8af0acfb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java @@ -20,6 +20,7 @@ package org.apache.doris.qe; import org.apache.doris.analysis.RedirectStatus; import org.apache.doris.catalog.Env; import org.apache.doris.common.ClientPool; +import org.apache.doris.common.DdlException; import org.apache.doris.common.telemetry.Telemetry; import org.apache.doris.thrift.FrontendService; import org.apache.doris.thrift.TMasterOpRequest; @@ -68,24 +69,40 @@ public class MasterOpExecutor { this.shouldNotRetry = !isQuery; } + /** + * used for simply syncing journal with master under strong consistency mode + */ + public MasterOpExecutor(ConnectContext ctx) { + this(null, ctx, RedirectStatus.FORWARD_WITH_SYNC, true); + } + public void execute() throws Exception { Span forwardSpan = ctx.getTracer().spanBuilder("forward").setParent(Context.current()) .startSpan(); try (Scope ignored = forwardSpan.makeCurrent()) { - forward(); + result = forward(buildStmtForwardParams()); } catch (Exception e) { forwardSpan.recordException(e); throw e; } finally { forwardSpan.end(); } + waitOnReplaying(); + } + + public void syncJournal() throws Exception { + result = forward(buildSyncJournalParmas()); + waitOnReplaying(); + } + + private void waitOnReplaying() throws DdlException { LOG.info("forwarding to master get result max journal id: {}", result.maxJournalId); ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, waitTimeoutMs); } // Send request to Master - private void forward() throws Exception { + private TMasterOpResult forward(TMasterOpRequest params) throws Exception { if (!ctx.getEnv().isReady()) { throw new Exception("Node catalog is not ready, please wait for a while."); } @@ -100,7 +117,52 @@ public class MasterOpExecutor { // may throw NullPointerException. add err msg throw new Exception("Failed to get master client.", e); } + final StringBuilder forwardMsg = new StringBuilder(String.format("forward to Master %s", thriftAddress)); + if (!params.isSyncJournalOnly()) { + forwardMsg.append(", statement: %s").append(ctx.getStmtId()); + } + LOG.info(forwardMsg.toString()); + + boolean isReturnToPool = false; + try { + final TMasterOpResult result = client.forward(params); + isReturnToPool = true; + return result; + } catch (TTransportException e) { + // wrap the raw exception. + forwardMsg.append(" : failed"); + Exception exception = new ForwardToMasterException(String.format(forwardMsg.toString()), e); + + boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs); + if (!ok) { + throw exception; + } + if (shouldNotRetry || e.getType() == TTransportException.TIMED_OUT) { + throw exception; + } else { + LOG.warn(forwardMsg.append(" twice").toString(), e); + try { + TMasterOpResult result = client.forward(params); + isReturnToPool = true; + return result; + } catch (TException ex) { + throw exception; + } + } + } finally { + if (isReturnToPool) { + ClientPool.frontendPool.returnObject(thriftAddress, client); + } else { + ClientPool.frontendPool.invalidateObject(thriftAddress, client); + } + } + } + + private TMasterOpRequest buildStmtForwardParams() { TMasterOpRequest params = new TMasterOpRequest(); + //node ident + params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); + params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); params.setCluster(ctx.getClusterName()); params.setSql(originStmt.originStmt); params.setStmtIdx(originStmt.idx); @@ -126,43 +188,16 @@ public class MasterOpExecutor { if (null != ctx.queryId()) { params.setQueryId(ctx.queryId()); } + return params; + } + + private TMasterOpRequest buildSyncJournalParmas() { + final TMasterOpRequest params = new TMasterOpRequest(); //node ident params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost()); params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort()); - LOG.info("Forward statement {} to Master {}", ctx.getStmtId(), thriftAddress); - - boolean isReturnToPool = false; - try { - result = client.forward(params); - isReturnToPool = true; - } catch (TTransportException e) { - // wrap the raw exception. - Exception exception = new ForwardToMasterException( - String.format("Forward statement %s to Master %s failed", ctx.getStmtId(), - thriftAddress), e); - - boolean ok = ClientPool.frontendPool.reopen(client, thriftTimeoutMs); - if (!ok) { - throw exception; - } - if (shouldNotRetry || e.getType() == TTransportException.TIMED_OUT) { - throw exception; - } else { - LOG.warn("Forward statement " + ctx.getStmtId() + " to Master " + thriftAddress + " twice", e); - try { - result = client.forward(params); - isReturnToPool = true; - } catch (TException ex) { - throw exception; - } - } - } finally { - if (isReturnToPool) { - ClientPool.frontendPool.returnObject(thriftAddress, client); - } else { - ClientPool.frontendPool.invalidateObject(thriftAddress, client); - } - } + params.setSyncJournalOnly(true); + return params; } public ByteBuffer getOutputPacket() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 6618c67d7c..2461e69b2b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -353,6 +353,14 @@ public class SessionVariable implements Serializable, Writable { public static final String EXTERNAL_TABLE_ANALYZE_PART_NUM = "external_table_analyze_part_num"; + public static final String ENABLE_STRONG_CONSISTENCY = "enable_strong_consistency_read"; + + public static final String CBO_CPU_WEIGHT = "cbo_cpu_weight"; + + public static final String CBO_MEM_WEIGHT = "cbo_mem_weight"; + + public static final String CBO_NET_WEIGHT = "cbo_net_weight"; + public static final List<String> DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -984,6 +992,14 @@ public class SessionVariable implements Serializable, Writable { ) public boolean ignoreColumnWithComplexType = false; + @VariableMgr.VarAttr(name = ENABLE_STRONG_CONSISTENCY, description = {"用以开启强一致读。Doris 默认支持同一个会话内的" + + "强一致性,即同一个会话内对数据的变更操作是实时可见的。如需要会话间的强一致读,则需将此变量设置为true。", + "Used to enable strong consistent reading. By default, Doris supports strong consistency " + + "within the same session, that is, changes to data within the same session are visible in " + + "real time. If you want strong consistent reads between sessions, set this variable to true. " + }) + public boolean enableStrongConsistencyRead = false; + // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to generate some variables, // not the default value set in the code. public void initFuzzyModeVariables() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c9f7bc66e0..32d6dd7a71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -565,6 +565,7 @@ public class StmtExecutor { private void handleQueryWithRetry(TUniqueId queryId) throws Exception { // queue query here + syncJournalIfNeeded(); if (!parsedStmt.isExplain() && Config.enable_workload_group && Config.enable_query_queue && context.getSessionVariable().getEnablePipelineEngine()) { this.queryQueue = context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context); @@ -785,6 +786,14 @@ public class StmtExecutor { } } + private void syncJournalIfNeeded() throws Exception { + final Env env = context.getEnv(); + if (env.isMaster() || !context.getSessionVariable().enableStrongConsistencyRead) { + return; + } + new MasterOpExecutor(context).syncJournal(); + } + /** * get variables in stmt. * diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 0c00615a9f..a9f81c8ee8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -897,6 +897,11 @@ public class FrontendServiceImpl implements FrontendService.Iface { LOG.warn("reject request from invalid host. client: {}", params.getClientNodeHost()); throw new TException("request from invalid host was rejected."); } + if (params.isSyncJournalOnly()) { + final TMasterOpResult result = new TMasterOpResult(); + result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId()); + return result; + } // add this log so that we can track this stmt LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), params.getClientNodeHost()); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 5cf08a4e38..6cbf075739 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -468,6 +468,7 @@ struct TMasterOpRequest { 21: optional map<string, string> trace_carrier 22: optional string clientNodeHost 23: optional i32 clientNodePort + 24: optional bool syncJournalOnly // if set to true, this request means to do nothing but just sync max journal id of master } struct TColumnDefinition { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
