This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b1be59c799 [enhancement](query) enable strong consistency by syncing
max journal id from master (#21205)
b1be59c799 is described below
commit b1be59c799769d690414262ec0b5bf0bb6687a9e
Author: Siyang Tang <[email protected]>
AuthorDate: Thu Jul 6 10:25:38 2023 +0800
[enhancement](query) enable strong consistency by syncing max journal id
from master (#21205)
Add a session var & config enable_strong_consistency_read to solve the
problem that loading result may be shortly invisible to follwers, to meet users
requirements in strong consistency read scenario.
Will sync max journal id from master and wait for replaying.
---
docs/en/docs/advanced/variables.md | 4 +
docs/zh-CN/docs/advanced/variables.md | 4 +
.../java/org/apache/doris/qe/MasterOpExecutor.java | 107 ++++++++++++++-------
.../java/org/apache/doris/qe/SessionVariable.java | 10 ++
.../java/org/apache/doris/qe/StmtExecutor.java | 9 ++
.../apache/doris/service/FrontendServiceImpl.java | 5 +
gensrc/thrift/FrontendService.thrift | 1 +
7 files changed, 104 insertions(+), 36 deletions(-)
diff --git a/docs/en/docs/advanced/variables.md
b/docs/en/docs/advanced/variables.md
index 682b390ae5..02d863c812 100644
--- a/docs/en/docs/advanced/variables.md
+++ b/docs/en/docs/advanced/variables.md
@@ -659,6 +659,10 @@ Translated with www.DeepL.com/Translator (free version)
Controls whether to use lazy materialization technology in orc reader. The
default value is true.
+* `enable_strong_consistency_read`
+
+ Used to enable strong consistent reading. By default, Doris supports strong
consistency within the same session, that is, changes to data within the same
session are visible in real time. If you want strong consistent reads between
sessions, set this variable to true.
+
***
#### Supplementary instructions on statement execution timeout control
diff --git a/docs/zh-CN/docs/advanced/variables.md
b/docs/zh-CN/docs/advanced/variables.md
index 0cbcb57280..c7bbe3c0c7 100644
--- a/docs/zh-CN/docs/advanced/variables.md
+++ b/docs/zh-CN/docs/advanced/variables.md
@@ -645,6 +645,10 @@ try (Connection conn =
DriverManager.getConnection("jdbc:mysql://127.0.0.1:9030/
控制 orc reader 是否启用延迟物化技术。默认为 true。
+* `enable_strong_consistency_read`
+
+ 用以开启强一致读。Doris
默认支持同一个会话内的强一致性,即同一个会话内对数据的变更操作是实时可见的。如需要会话间的强一致读,则需将此变量设置为true。
+
***
#### 关于语句执行超时控制的补充说明
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
index 9bbbd598d6..ba8af0acfb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MasterOpExecutor.java
@@ -20,6 +20,7 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.telemetry.Telemetry;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TMasterOpRequest;
@@ -68,24 +69,40 @@ public class MasterOpExecutor {
this.shouldNotRetry = !isQuery;
}
+ /**
+ * used for simply syncing journal with master under strong consistency
mode
+ */
+ public MasterOpExecutor(ConnectContext ctx) {
+ this(null, ctx, RedirectStatus.FORWARD_WITH_SYNC, true);
+ }
+
public void execute() throws Exception {
Span forwardSpan =
ctx.getTracer().spanBuilder("forward").setParent(Context.current())
.startSpan();
try (Scope ignored = forwardSpan.makeCurrent()) {
- forward();
+ result = forward(buildStmtForwardParams());
} catch (Exception e) {
forwardSpan.recordException(e);
throw e;
} finally {
forwardSpan.end();
}
+ waitOnReplaying();
+ }
+
+ public void syncJournal() throws Exception {
+ result = forward(buildSyncJournalParmas());
+ waitOnReplaying();
+ }
+
+ private void waitOnReplaying() throws DdlException {
LOG.info("forwarding to master get result max journal id: {}",
result.maxJournalId);
ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId,
waitTimeoutMs);
}
// Send request to Master
- private void forward() throws Exception {
+ private TMasterOpResult forward(TMasterOpRequest params) throws Exception {
if (!ctx.getEnv().isReady()) {
throw new Exception("Node catalog is not ready, please wait for a
while.");
}
@@ -100,7 +117,52 @@ public class MasterOpExecutor {
// may throw NullPointerException. add err msg
throw new Exception("Failed to get master client.", e);
}
+ final StringBuilder forwardMsg = new
StringBuilder(String.format("forward to Master %s", thriftAddress));
+ if (!params.isSyncJournalOnly()) {
+ forwardMsg.append(", statement: %s").append(ctx.getStmtId());
+ }
+ LOG.info(forwardMsg.toString());
+
+ boolean isReturnToPool = false;
+ try {
+ final TMasterOpResult result = client.forward(params);
+ isReturnToPool = true;
+ return result;
+ } catch (TTransportException e) {
+ // wrap the raw exception.
+ forwardMsg.append(" : failed");
+ Exception exception = new
ForwardToMasterException(String.format(forwardMsg.toString()), e);
+
+ boolean ok = ClientPool.frontendPool.reopen(client,
thriftTimeoutMs);
+ if (!ok) {
+ throw exception;
+ }
+ if (shouldNotRetry || e.getType() ==
TTransportException.TIMED_OUT) {
+ throw exception;
+ } else {
+ LOG.warn(forwardMsg.append(" twice").toString(), e);
+ try {
+ TMasterOpResult result = client.forward(params);
+ isReturnToPool = true;
+ return result;
+ } catch (TException ex) {
+ throw exception;
+ }
+ }
+ } finally {
+ if (isReturnToPool) {
+ ClientPool.frontendPool.returnObject(thriftAddress, client);
+ } else {
+ ClientPool.frontendPool.invalidateObject(thriftAddress,
client);
+ }
+ }
+ }
+
+ private TMasterOpRequest buildStmtForwardParams() {
TMasterOpRequest params = new TMasterOpRequest();
+ //node ident
+ params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
+ params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
params.setCluster(ctx.getClusterName());
params.setSql(originStmt.originStmt);
params.setStmtIdx(originStmt.idx);
@@ -126,43 +188,16 @@ public class MasterOpExecutor {
if (null != ctx.queryId()) {
params.setQueryId(ctx.queryId());
}
+ return params;
+ }
+
+ private TMasterOpRequest buildSyncJournalParmas() {
+ final TMasterOpRequest params = new TMasterOpRequest();
//node ident
params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
- LOG.info("Forward statement {} to Master {}", ctx.getStmtId(),
thriftAddress);
-
- boolean isReturnToPool = false;
- try {
- result = client.forward(params);
- isReturnToPool = true;
- } catch (TTransportException e) {
- // wrap the raw exception.
- Exception exception = new ForwardToMasterException(
- String.format("Forward statement %s to Master %s failed",
ctx.getStmtId(),
- thriftAddress), e);
-
- boolean ok = ClientPool.frontendPool.reopen(client,
thriftTimeoutMs);
- if (!ok) {
- throw exception;
- }
- if (shouldNotRetry || e.getType() ==
TTransportException.TIMED_OUT) {
- throw exception;
- } else {
- LOG.warn("Forward statement " + ctx.getStmtId() + " to Master
" + thriftAddress + " twice", e);
- try {
- result = client.forward(params);
- isReturnToPool = true;
- } catch (TException ex) {
- throw exception;
- }
- }
- } finally {
- if (isReturnToPool) {
- ClientPool.frontendPool.returnObject(thriftAddress, client);
- } else {
- ClientPool.frontendPool.invalidateObject(thriftAddress,
client);
- }
- }
+ params.setSyncJournalOnly(true);
+ return params;
}
public ByteBuffer getOutputPacket() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 517bd248b5..fde5d0c8af 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -353,6 +353,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String EXTERNAL_TABLE_ANALYZE_PART_NUM =
"external_table_analyze_part_num";
+ public static final String ENABLE_STRONG_CONSISTENCY =
"enable_strong_consistency_read";
+
public static final String CBO_CPU_WEIGHT = "cbo_cpu_weight";
public static final String CBO_MEM_WEIGHT = "cbo_mem_weight";
@@ -1023,6 +1025,14 @@ public class SessionVariable implements Serializable,
Writable {
)
public boolean ignoreColumnWithComplexType = false;
+ @VariableMgr.VarAttr(name = ENABLE_STRONG_CONSISTENCY, description =
{"用以开启强一致读。Doris 默认支持同一个会话内的"
+ + "强一致性,即同一个会话内对数据的变更操作是实时可见的。如需要会话间的强一致读,则需将此变量设置为true。",
+ "Used to enable strong consistent reading. By default, Doris
supports strong consistency "
+ + "within the same session, that is, changes to data
within the same session are visible in "
+ + "real time. If you want strong consistent reads between
sessions, set this variable to true. "
+ })
+ public boolean enableStrongConsistencyRead = false;
+
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to
generate some variables,
// not the default value set in the code.
public void initFuzzyModeVariables() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 9cb5a7fe2a..c538c0eb37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -563,6 +563,7 @@ public class StmtExecutor {
private void handleQueryWithRetry(TUniqueId queryId) throws Exception {
// queue query here
+ syncJournalIfNeeded();
if (!parsedStmt.isExplain() && Config.enable_workload_group &&
Config.enable_query_queue
&& context.getSessionVariable().getEnablePipelineEngine()) {
this.queryQueue =
context.getEnv().getWorkloadGroupMgr().getWorkloadGroupQueryQueue(context);
@@ -783,6 +784,14 @@ public class StmtExecutor {
}
}
+ private void syncJournalIfNeeded() throws Exception {
+ final Env env = context.getEnv();
+ if (env.isMaster() ||
!context.getSessionVariable().enableStrongConsistencyRead) {
+ return;
+ }
+ new MasterOpExecutor(context).syncJournal();
+ }
+
/**
* get variables in stmt.
*
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 0c00615a9f..a9f81c8ee8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -897,6 +897,11 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
LOG.warn("reject request from invalid host. client: {}",
params.getClientNodeHost());
throw new TException("request from invalid host was rejected.");
}
+ if (params.isSyncJournalOnly()) {
+ final TMasterOpResult result = new TMasterOpResult();
+ result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
+ return result;
+ }
// add this log so that we can track this stmt
LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(),
params.getClientNodeHost());
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 5cf08a4e38..6cbf075739 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -468,6 +468,7 @@ struct TMasterOpRequest {
21: optional map<string, string> trace_carrier
22: optional string clientNodeHost
23: optional i32 clientNodePort
+ 24: optional bool syncJournalOnly // if set to true, this request means to
do nothing but just sync max journal id of master
}
struct TColumnDefinition {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]