This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 58b00858ab [Refactor](pipeline) Remove unless fe session variable
enable_rpc_opt_for_pipeline (#18019)
58b00858ab is described below
commit 58b00858abcbcb5429085e32951abb34eb320e89
Author: HappenLee <[email protected]>
AuthorDate: Thu Mar 23 07:27:58 2023 +0800
[Refactor](pipeline) Remove unless fe session variable
enable_rpc_opt_for_pipeline (#18019)
---
.../org/apache/doris/catalog/ScalarFunction.java | 2 +-
.../org/apache/doris/common/util/VectorizedUtil.java | 9 ---------
.../main/java/org/apache/doris/qe/Coordinator.java | 20 ++++++++------------
.../java/org/apache/doris/qe/SessionVariable.java | 7 -------
4 files changed, 9 insertions(+), 29 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java
index 5e3906d93b..f5c11fd4c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java
@@ -412,7 +412,7 @@ public class ScalarFunction extends Function {
public TFunction toThrift(Type realReturnType, Type[] realArgTypes) {
TFunction fn = super.toThrift(realReturnType, realArgTypes);
fn.setScalarFn(new TScalarFunction());
- if (getBinaryType() != TFunctionBinaryType.BUILTIN ||
!VectorizedUtil.optRpcForPipeline()) {
+ if (getBinaryType() != TFunctionBinaryType.BUILTIN ||
!VectorizedUtil.isPipeline()) {
fn.getScalarFn().setSymbol(symbolName);
if (prepareFnSymbol != null) {
fn.getScalarFn().setPrepareFnSymbol(prepareFnSymbol);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
index 39dc84ac6b..ade791dd2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
@@ -41,13 +41,4 @@ public class VectorizedUtil {
}
return connectContext.getSessionVariable().enablePipelineEngine();
}
-
- public static boolean optRpcForPipeline() {
- ConnectContext connectContext = ConnectContext.get();
- if (connectContext == null) {
- return false;
- }
- return connectContext.getSessionVariable().enablePipelineEngine()
- &&
connectContext.getSessionVariable().enableRpcOptForPipeline();
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index fdba950667..5aa22b27e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -249,8 +249,6 @@ public class Coordinator {
private boolean enablePipelineEngine = false;
- private boolean enableRpcOptForPipeline = false;
-
// Runtime filter merge instance address and ID
public TNetworkAddress runtimeFilterMergeAddr;
public TUniqueId runtimeFilterMergeInstanceId;
@@ -328,8 +326,6 @@ public class Coordinator {
this.returnedAllResults = false;
this.enableShareHashTableForBroadcastJoin =
context.getSessionVariable().enableShareHashTableForBroadcastJoin;
this.enablePipelineEngine =
context.getSessionVariable().enablePipelineEngine;
- this.enableRpcOptForPipeline =
context.getSessionVariable().enablePipelineEngine
- && context.getSessionVariable().enableRpcOptForPipeline;
initQueryOptions(context);
setFromUserProperty(analyzer);
@@ -499,7 +495,7 @@ public class Coordinator {
public Map<String, Integer> getBeToInstancesNum() {
Map<String, Integer> result = Maps.newTreeMap();
- if (enableRpcOptForPipeline) {
+ if (enablePipelineEngine) {
for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
result.put(ctxs.brpcAddr.hostname.concat(":").concat("" +
ctxs.brpcAddr.port), ctxs.ctxs.size());
}
@@ -644,7 +640,7 @@ public class Coordinator {
profileDoneSignal.addMark(instanceId, -1L /* value is meaningless
*/);
}
if (!isPointQuery) {
- if (enableRpcOptForPipeline) {
+ if (enablePipelineEngine) {
sendPipelineCtx();
} else {
sendFragment();
@@ -1273,7 +1269,7 @@ public class Coordinator {
}
private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason
cancelReason) {
- if (enableRpcOptForPipeline) {
+ if (enablePipelineEngine) {
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
ctx.cancelFragmentInstance(cancelReason);
}
@@ -2098,7 +2094,7 @@ public class Coordinator {
}
public void updateFragmentExecStatus(TReportExecStatusParams params) {
- if (enableRpcOptForPipeline) {
+ if (enablePipelineEngine) {
PipelineExecContext ctx =
pipelineExecContexts.get(params.getFragmentId());
if (!ctx.updateProfile(params)) {
return;
@@ -2217,7 +2213,7 @@ public class Coordinator {
}
public void endProfile(boolean waitProfileDone) {
- if (enableRpcOptForPipeline) {
+ if (enablePipelineEngine) {
if (pipelineExecContexts.isEmpty()) {
return;
}
@@ -2285,7 +2281,7 @@ public class Coordinator {
* return true if all of them are OK. Otherwise, return false.
*/
private boolean checkBackendState() {
- if (enableRpcOptForPipeline) {
+ if (enablePipelineEngine) {
for (PipelineExecContext ctx : needCheckPipelineExecContexts) {
if (!ctx.isBackendStateHealthy()) {
queryStatus = new Status(TStatusCode.INTERNAL_ERROR,
"backend "
@@ -3297,7 +3293,7 @@ public class Coordinator {
Lists.newArrayList();
lock();
try {
- if (enableRpcOptForPipeline) {
+ if (enablePipelineEngine) {
for (int index = 0; index < fragments.size(); index++) {
for (PipelineExecContext ctx :
pipelineExecContexts.values()) {
if (fragments.get(index).getFragmentId() !=
ctx.fragmentId) {
@@ -3326,7 +3322,7 @@ public class Coordinator {
}
private void attachInstanceProfileToFragmentProfile() {
- if (enableRpcOptForPipeline) {
+ if (enablePipelineEngine) {
for (PipelineExecContext ctx : pipelineExecContexts.values()) {
if (!ctx.computeTimeInProfile(fragmentProfile.size())) {
return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 0fafbe4c7a..e218b0af7c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -523,9 +523,6 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_PIPELINE_ENGINE, fuzzy = true)
public boolean enablePipelineEngine = false;
- @VariableMgr.VarAttr(name = ENABLE_RPC_OPT_FOR_PIPELINE)
- public boolean enableRpcOptForPipeline = true;
-
@VariableMgr.VarAttr(name = ENABLE_PARALLEL_OUTFILE)
public boolean enableParallelOutfile = false;
@@ -1335,10 +1332,6 @@ public class SessionVariable implements Serializable,
Writable {
return enablePipelineEngine;
}
- public boolean enableRpcOptForPipeline() {
- return enableRpcOptForPipeline;
- }
-
public void setEnablePipelineEngine(boolean enablePipelineEngine) {
this.enablePipelineEngine = enablePipelineEngine;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]