This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 58b00858ab [Refactor](pipeline) Remove unless fe session variable 
enable_rpc_opt_for_pipeline (#18019)
58b00858ab is described below

commit 58b00858abcbcb5429085e32951abb34eb320e89
Author: HappenLee <[email protected]>
AuthorDate: Thu Mar 23 07:27:58 2023 +0800

    [Refactor](pipeline) Remove unless fe session variable 
enable_rpc_opt_for_pipeline (#18019)
---
 .../org/apache/doris/catalog/ScalarFunction.java     |  2 +-
 .../org/apache/doris/common/util/VectorizedUtil.java |  9 ---------
 .../main/java/org/apache/doris/qe/Coordinator.java   | 20 ++++++++------------
 .../java/org/apache/doris/qe/SessionVariable.java    |  7 -------
 4 files changed, 9 insertions(+), 29 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java
index 5e3906d93b..f5c11fd4c4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ScalarFunction.java
@@ -412,7 +412,7 @@ public class ScalarFunction extends Function {
     public TFunction toThrift(Type realReturnType, Type[] realArgTypes) {
         TFunction fn = super.toThrift(realReturnType, realArgTypes);
         fn.setScalarFn(new TScalarFunction());
-        if (getBinaryType() != TFunctionBinaryType.BUILTIN || 
!VectorizedUtil.optRpcForPipeline()) {
+        if (getBinaryType() != TFunctionBinaryType.BUILTIN || 
!VectorizedUtil.isPipeline()) {
             fn.getScalarFn().setSymbol(symbolName);
             if (prepareFnSymbol != null) {
                 fn.getScalarFn().setPrepareFnSymbol(prepareFnSymbol);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
index 39dc84ac6b..ade791dd2d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/VectorizedUtil.java
@@ -41,13 +41,4 @@ public class VectorizedUtil {
         }
         return connectContext.getSessionVariable().enablePipelineEngine();
     }
-
-    public static boolean optRpcForPipeline() {
-        ConnectContext connectContext = ConnectContext.get();
-        if (connectContext == null) {
-            return false;
-        }
-        return connectContext.getSessionVariable().enablePipelineEngine()
-                && 
connectContext.getSessionVariable().enableRpcOptForPipeline();
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index fdba950667..5aa22b27e2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -249,8 +249,6 @@ public class Coordinator {
 
     private boolean enablePipelineEngine = false;
 
-    private boolean enableRpcOptForPipeline = false;
-
     // Runtime filter merge instance address and ID
     public TNetworkAddress runtimeFilterMergeAddr;
     public TUniqueId runtimeFilterMergeInstanceId;
@@ -328,8 +326,6 @@ public class Coordinator {
         this.returnedAllResults = false;
         this.enableShareHashTableForBroadcastJoin = 
context.getSessionVariable().enableShareHashTableForBroadcastJoin;
         this.enablePipelineEngine = 
context.getSessionVariable().enablePipelineEngine;
-        this.enableRpcOptForPipeline = 
context.getSessionVariable().enablePipelineEngine
-                && context.getSessionVariable().enableRpcOptForPipeline;
         initQueryOptions(context);
 
         setFromUserProperty(analyzer);
@@ -499,7 +495,7 @@ public class Coordinator {
 
     public Map<String, Integer> getBeToInstancesNum() {
         Map<String, Integer> result = Maps.newTreeMap();
-        if (enableRpcOptForPipeline) {
+        if (enablePipelineEngine) {
             for (PipelineExecContexts ctxs : beToPipelineExecCtxs.values()) {
                 result.put(ctxs.brpcAddr.hostname.concat(":").concat("" + 
ctxs.brpcAddr.port), ctxs.ctxs.size());
             }
@@ -644,7 +640,7 @@ public class Coordinator {
             profileDoneSignal.addMark(instanceId, -1L /* value is meaningless 
*/);
         }
         if (!isPointQuery) {
-            if (enableRpcOptForPipeline) {
+            if (enablePipelineEngine) {
                 sendPipelineCtx();
             } else {
                 sendFragment();
@@ -1273,7 +1269,7 @@ public class Coordinator {
     }
 
     private void cancelRemoteFragmentsAsync(Types.PPlanFragmentCancelReason 
cancelReason) {
-        if (enableRpcOptForPipeline) {
+        if (enablePipelineEngine) {
             for (PipelineExecContext ctx : pipelineExecContexts.values()) {
                 ctx.cancelFragmentInstance(cancelReason);
             }
@@ -2098,7 +2094,7 @@ public class Coordinator {
     }
 
     public void updateFragmentExecStatus(TReportExecStatusParams params) {
-        if (enableRpcOptForPipeline) {
+        if (enablePipelineEngine) {
             PipelineExecContext ctx = 
pipelineExecContexts.get(params.getFragmentId());
             if (!ctx.updateProfile(params)) {
                 return;
@@ -2217,7 +2213,7 @@ public class Coordinator {
     }
 
     public void endProfile(boolean waitProfileDone) {
-        if (enableRpcOptForPipeline) {
+        if (enablePipelineEngine) {
             if (pipelineExecContexts.isEmpty()) {
                 return;
             }
@@ -2285,7 +2281,7 @@ public class Coordinator {
      * return true if all of them are OK. Otherwise, return false.
      */
     private boolean checkBackendState() {
-        if (enableRpcOptForPipeline) {
+        if (enablePipelineEngine) {
             for (PipelineExecContext ctx : needCheckPipelineExecContexts) {
                 if (!ctx.isBackendStateHealthy()) {
                     queryStatus = new Status(TStatusCode.INTERNAL_ERROR, 
"backend "
@@ -3297,7 +3293,7 @@ public class Coordinator {
                 Lists.newArrayList();
         lock();
         try {
-            if (enableRpcOptForPipeline) {
+            if (enablePipelineEngine) {
                 for (int index = 0; index < fragments.size(); index++) {
                     for (PipelineExecContext ctx : 
pipelineExecContexts.values()) {
                         if (fragments.get(index).getFragmentId() != 
ctx.fragmentId) {
@@ -3326,7 +3322,7 @@ public class Coordinator {
     }
 
     private void attachInstanceProfileToFragmentProfile() {
-        if (enableRpcOptForPipeline) {
+        if (enablePipelineEngine) {
             for (PipelineExecContext ctx : pipelineExecContexts.values()) {
                 if (!ctx.computeTimeInProfile(fragmentProfile.size())) {
                     return;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 0fafbe4c7a..e218b0af7c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -523,9 +523,6 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_PIPELINE_ENGINE, fuzzy = true)
     public boolean enablePipelineEngine = false;
 
-    @VariableMgr.VarAttr(name = ENABLE_RPC_OPT_FOR_PIPELINE)
-    public boolean enableRpcOptForPipeline = true;
-
     @VariableMgr.VarAttr(name = ENABLE_PARALLEL_OUTFILE)
     public boolean enableParallelOutfile = false;
 
@@ -1335,10 +1332,6 @@ public class SessionVariable implements Serializable, 
Writable {
         return enablePipelineEngine;
     }
 
-    public boolean enableRpcOptForPipeline() {
-        return enableRpcOptForPipeline;
-    }
-
     public void setEnablePipelineEngine(boolean enablePipelineEngine) {
         this.enablePipelineEngine = enablePipelineEngine;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to