[ 
https://issues.apache.org/jira/browse/DRILL-6731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16645517#comment-16645517
 ] 

ASF GitHub Bot commented on DRILL-6731:
---------------------------------------

sohami closed pull request #1459: DRILL-6731: Move the BFs aggregating work 
from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 608f05c567a..88c21d9e957 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -21,6 +21,7 @@
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import 
org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
 import org.apache.calcite.schema.SchemaPlus;
 import org.apache.drill.common.config.DrillConfig;
@@ -160,16 +161,15 @@ BufferAllocator getNewChildAllocator(final String 
operatorName,
   void close();
 
   /**
-   * Return null ,if setRuntimeFilter not being called
    * @return
    */
-  RuntimeFilterWritable getRuntimeFilter();
+  RuntimeFilterSink getRuntimeFilterSink();
 
   /**
-   * Set a RuntimeFilter when the RuntimeFilter receiver belongs to the same 
MinorFragment
+   * add a RuntimeFilter when the RuntimeFilter receiver belongs to the same 
MinorFragment
    * @param runtimeFilter
    */
-  public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter);
+  public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter);
 
   interface ExecutorState {
     /**
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index a8980785a32..fcfdc8c6b54 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -61,6 +61,7 @@
 import org.apache.drill.exec.util.ImpersonationUtil;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -136,7 +137,7 @@ public void interrupt(final InterruptedException e) {
   /** Stores constants and their holders by type */
   private final Map<String, Map<MinorType, ValueHolder>> 
constantValueHolderCache;
 
-  private RuntimeFilterWritable runtimeFilterWritable;
+  private RuntimeFilterSink runtimeFilterSink;
 
   /**
    * Create a FragmentContext instance for non-root fragment.
@@ -208,6 +209,11 @@ public FragmentContextImpl(final DrillbitContext 
dbContext, final PlanFragment f
     stats = new FragmentStats(allocator, fragment.getAssignment());
     bufferManager = new BufferManagerImpl(this.allocator);
     constantValueHolderCache = Maps.newHashMap();
+    boolean enableRF = 
context.getOptionManager().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
+    if (enableRF) {
+      ExecutorService executorService = context.getExecutor();
+      this.runtimeFilterSink = new RuntimeFilterSink(this.allocator, 
executorService);
+    }
   }
 
   /**
@@ -348,13 +354,13 @@ public boolean isUserAuthenticationEnabled() {
   }
 
   @Override
-  public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-    this.runtimeFilterWritable = runtimeFilter;
+  public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+    this.runtimeFilterSink.aggregate(runtimeFilter);
   }
 
   @Override
-  public RuntimeFilterWritable getRuntimeFilter() {
-    return runtimeFilterWritable;
+  public RuntimeFilterSink getRuntimeFilterSink() {
+    return runtimeFilterSink;
   }
 
   /**
@@ -470,9 +476,7 @@ public void close() {
     for (OperatorContextImpl opContext : contexts) {
       suppressingClose(opContext);
     }
-    if (runtimeFilterWritable != null) {
-      suppressingClose(runtimeFilterWritable);
-    }
+    suppressingClose(runtimeFilterSink);
     suppressingClose(bufferManager);
     suppressingClose(allocator);
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index bc21580d369..9248bbc698e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -36,7 +36,9 @@
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.work.filter.BloomFilter;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
+
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashMap;
@@ -56,6 +58,8 @@
   private Map<String, Integer> field2id = new HashMap<>();
   private List<String> toFilterFields;
   private List<BloomFilter> bloomFilters;
+  private RuntimeFilterWritable current;
+  private RuntimeFilterWritable previous;
   private int originalRecordCount;
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
 
@@ -102,6 +106,9 @@ public void close() {
       sv2.clear();
     }
     super.close();
+    if (current != null) {
+      current.close();
+    }
   }
 
   @Override
@@ -148,30 +155,36 @@ protected boolean setupNewSchema() throws 
SchemaChangeException {
    * schema change hash64 should be reset and this method needs to be called 
again.
    */
   private void setupHashHelper() {
-    final RuntimeFilterWritable runtimeFilterWritable = 
context.getRuntimeFilter();
-
+    final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
     // Check if RuntimeFilterWritable was received by the minor fragment or not
-    if (runtimeFilterWritable == null) {
+    if (!runtimeFilterSink.containOne()) {
       return;
     }
-
-    // Check if bloomFilters is initialized or not
-    if (bloomFilters == null) {
-      bloomFilters = runtimeFilterWritable.unwrap();
+    if (runtimeFilterSink.hasFreshOne()) {
+      RuntimeFilterWritable freshRuntimeFilterWritable = 
runtimeFilterSink.fetchLatestDuplicatedAggregatedOne();
+      if (current == null) {
+        current = freshRuntimeFilterWritable;
+        previous = freshRuntimeFilterWritable;
+      } else {
+        previous = current;
+        current = freshRuntimeFilterWritable;
+        previous.close();
+      }
+      bloomFilters = current.unwrap();
     }
-
     // Check if HashHelper is initialized or not
     if (hash64 == null) {
       ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(incoming, 
context);
       try {
         //generate hash helper
-        this.toFilterFields = 
runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+        this.toFilterFields = 
current.getRuntimeFilterBDef().getProbeFieldsList();
         List<LogicalExpression> hashFieldExps = new ArrayList<>();
         List<TypedFieldId> typedFieldIds = new ArrayList<>();
         for (String toFilterField : toFilterFields) {
           SchemaPath schemaPath = new SchemaPath(new 
PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
           TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
-          this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+          int[] fieldIds = typedFieldId.getFieldIds();
+          this.field2id.put(toFilterField, fieldIds[0]);
           typedFieldIds.add(typedFieldId);
           ValueVectorReadExpression toHashFieldExp = new 
ValueVectorReadExpression(typedFieldId);
           hashFieldExps.add(toHashFieldExp);
@@ -195,11 +208,9 @@ private void applyRuntimeFilter() throws 
SchemaChangeException {
       sv2.setRecordCount(0);
       return;
     }
-
-    final RuntimeFilterWritable runtimeFilterWritable = 
context.getRuntimeFilter();
+    final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
     sv2.allocateNew(originalRecordCount);
-
-    if (runtimeFilterWritable == null) {
+    if (!runtimeFilterSink.containOne()) {
       // means none of the rows are filtered out hence set all the indexes
       for (int i = 0; i < originalRecordCount; ++i) {
         sv2.setIndex(i, i);
@@ -207,10 +218,8 @@ private void applyRuntimeFilter() throws 
SchemaChangeException {
       sv2.setRecordCount(originalRecordCount);
       return;
     }
-
-    // Setup a hash helper if need be
+    // Setup a hash helper if needed
     setupHashHelper();
-
     //To make each independent bloom filter work together to construct a final 
filter result: BitSet.
     BitSet bitSet = new BitSet(originalRecordCount);
     for (int i = 0; i < toFilterFields.size(); i++) {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 658f03a338c..3d456967f08 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -724,7 +724,7 @@ private void initializeRuntimeFilter() {
     runtimeFilterReporter = new 
RuntimeFilterReporter((ExecutorFragmentContext) context);
     RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
     //RuntimeFilter is not a necessary part of a HashJoin operator, only the 
query which satisfy the
-    //RuntimeFilterManager's judgement will have the RuntimeFilterDef.
+    //RuntimeFilterRouter's judgement will have the RuntimeFilterDef.
     if (runtimeFilterDef != null) {
       List<BloomFilterDef> bloomFilterDefs = 
runtimeFilterDef.getBloomFilterDefs();
       for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
@@ -944,7 +944,8 @@ public IterOutcome executeBuildPhase() throws 
SchemaChangeException {
 
     if (cycleNum == 0 && enableRuntimeFilter) {
       if (bloomFilters.size() > 0) {
-        runtimeFilterReporter.sendOut(bloomFilters, probeFields, 
this.popConfig.getRuntimeFilterDef().isSendToForeman());
+        int hashJoinOpId = this.popConfig.getOperatorId();
+        runtimeFilterReporter.sendOut(bloomFilters, probeFields, 
this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
       }
     }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
index c31e491a388..fcfa2bca1fc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
@@ -197,11 +197,8 @@ public Void visitPrel(Prel prel, RFHelperHolder holder) 
throws RuntimeException
     @Override
     public Void visitExchange(ExchangePrel exchange, RFHelperHolder holder) 
throws RuntimeException {
       if (holder != null) {
-        boolean broadcastExchange = exchange instanceof BroadcastExchangePrel;
         if (holder.isFromBuildSide()) {
-          //To the build side ,we need to identify whether the HashJoin's 
direct children have a Broadcast node to mark
-          //this HashJoin as BroadcastHashJoin
-          holder.setEncounteredBroadcastExchange(broadcastExchange);
+          holder.setBuildSideExchange(exchange);
         }
       }
       return visitPrel(exchange, holder);
@@ -224,19 +221,11 @@ public Void visitJoin(JoinPrel prel, RFHelperHolder 
holder) throws RuntimeExcept
           Prel right = (Prel) hashJoinPrel.getRight();
           holder.setFromBuildSide(true);
           right.accept(this, holder);
-          boolean buildSideEncountererdBroadcastExchange = 
holder.isEncounteredBroadcastExchange();
-          if (buildSideEncountererdBroadcastExchange) {
-            runtimeFilterDef.setSendToForeman(false);
-          } else {
-            runtimeFilterDef.setSendToForeman(true);
-          }
+          boolean routeToForeman = holder.needToRouteToForeman();
+          runtimeFilterDef.setSendToForeman(routeToForeman);
           List<BloomFilterDef> bloomFilterDefs = 
runtimeFilterDef.getBloomFilterDefs();
           for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
-            if (buildSideEncountererdBroadcastExchange) {
-              bloomFilterDef.setLocal(true);
-            } else {
-              bloomFilterDef.setLocal(false);
-            }
+            bloomFilterDef.setLocal(!routeToForeman);
           }
         }
       }
@@ -338,18 +327,17 @@ public boolean isEncounteredBlockNode() {
    * RuntimeFilter helper util holder
    */
   private static class RFHelperHolder {
-    //whether this join operator is a partitioned HashJoin or broadcast 
HashJoin,
-    //also single node HashJoin is not expected to do JPPD.
-    private boolean encounteredBroadcastExchange;
 
     private boolean fromBuildSide;
 
-    public boolean isEncounteredBroadcastExchange() {
-      return encounteredBroadcastExchange;
+    private ExchangePrel exchangePrel;
+
+    public void setBuildSideExchange(ExchangePrel exchange){
+      this.exchangePrel = exchange;
     }
 
-    public void setEncounteredBroadcastExchange(boolean 
encounteredBroadcastExchange) {
-      this.encounteredBroadcastExchange = encounteredBroadcastExchange;
+    public boolean needToRouteToForeman() {
+      return exchangePrel != null && !(exchangePrel instanceof 
BroadcastExchangePrel);
     }
 
     public boolean isFromBuildSide() {
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index bf91ed3cdfa..0d97e0ac308 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -395,7 +395,7 @@ public void run() {
               final String originalName = currentThread.getName();
               currentThread.setName(queryIdStr + 
":foreman:registerRuntimeFilter");
               try {
-                
foreman.getRuntimeFilterManager().registerRuntimeFilter(runtimeFilter);
+                
foreman.getRuntimeFilterRouter().registerRuntimeFilter(runtimeFilter);
               } catch (Exception e) {
                 logger.warn("Exception while registering the RuntimeFilter", 
e);
               } finally {
@@ -413,7 +413,7 @@ public void run() {
           .setQueryId(queryId).build();
         FragmentExecutor fragmentExecutor = 
runningFragments.get(fragmentHandle);
         if (fragmentExecutor != null) {
-          fragmentExecutor.getContext().setRuntimeFilter(runtimeFilter);
+          fragmentExecutor.getContext().addRuntimeFilter(runtimeFilter);
         }
       }
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
index e6ede7a4393..6e4a9a8e511 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
@@ -39,7 +39,7 @@ public RuntimeFilterReporter(ExecutorFragmentContext context) 
{
     this.context = context;
   }
 
-  public void sendOut(List<BloomFilter> bloomFilters, List<String> 
probeFields, boolean sendToForeman) {
+  public void sendOut(List<BloomFilter> bloomFilters, List<String> 
probeFields, boolean sendToForeman, int hashJoinOpId) {
     ExecProtos.FragmentHandle fragmentHandle = context.getHandle();
     DrillBuf[] data = new DrillBuf[bloomFilters.size()];
     List<Integer> bloomFilterSizeInBytes = new ArrayList<>();
@@ -63,6 +63,7 @@ public void sendOut(List<BloomFilter> bloomFilters, 
List<String> probeFields, bo
       .setMajorFragmentId(majorFragmentId)
       .setMinorFragmentId(minorFragmentId)
       .setToForeman(sendToForeman)
+      .setHjOpId(hashJoinOpId)
       .addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes)
       .build();
     RuntimeFilterWritable runtimeFilterWritable = new 
RuntimeFilterWritable(runtimeFilterB, data);
@@ -72,7 +73,7 @@ public void sendOut(List<BloomFilter> bloomFilters, 
List<String> probeFields, bo
       AccountingDataTunnel dataTunnel = context.getDataTunnel(foremanEndpoint);
       dataTunnel.sendRuntimeFilter(runtimeFilterWritable);
     } else {
-      context.setRuntimeFilter(runtimeFilterWritable);
+      context.addRuntimeFilter(runtimeFilterWritable);
     }
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
similarity index 87%
rename from 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
rename to 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
index e3f89a6e727..5a8c6fc9e1f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
@@ -18,7 +18,7 @@
 package org.apache.drill.exec.work.filter;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.drill.exec.ops.AccountingDataTunnel;
 import org.apache.drill.exec.ops.Consumer;
@@ -35,7 +35,6 @@
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.GeneralRPCProtos;
 import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.RpcOutcomeListener;
 import org.apache.drill.exec.rpc.data.DataTunnel;
@@ -60,17 +59,14 @@
  * The HashJoinRecordBatch is responsible to generate the RuntimeFilter.
  * To Partitioned case:
  * The generated RuntimeFilter will be sent to the Foreman node. The Foreman 
node receives the RuntimeFilter
- * async, aggregates them, broadcasts them the Scan nodes's MinorFragment. The 
RuntimeFilterRecordBatch which
- * steps over the Scan node will leverage the received RuntimeFilter to filter 
out the scanned rows to generate
- * the SV2.
+ * async, broadcasts them to the Scan nodes's MinorFragment. The 
RuntimeFilterRecordBatch which is downstream
+ * to the Scan node will aggregate all the received RuntimeFilter and will 
leverage it to filter out the
+ * scanned rows to generate the SV2.
  * To Broadcast case:
  * The generated RuntimeFilter will be sent to Scan node's 
RuntimeFilterRecordBatch directly. The working of the
  * RuntimeFilterRecordBath is the same as the Partitioned one.
- *
- *
- *
  */
-public class RuntimeFilterManager {
+public class RuntimeFilterRouter {
 
   private Wrapper rootWrapper;
   //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
@@ -79,14 +75,12 @@
   private Map<Integer, Integer> joinMjId2scanSize = new ConcurrentHashMap<>();
   //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
   private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
-  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
-  private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new 
ConcurrentHashMap<>();
 
   private DrillbitContext drillbitContext;
 
   private SendingAccountor sendingAccountor = new SendingAccountor();
 
-  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterManager.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterRouter.class);
 
   /**
    * This class maintains context for the runtime join push down's filter 
management. It
@@ -95,7 +89,7 @@
    * @param workUnit
    * @param drillbitContext
    */
-  public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext 
drillbitContext) {
+  public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext 
drillbitContext) {
     this.rootWrapper = workUnit.getRootWrapper();
     this.drillbitContext = drillbitContext;
   }
@@ -134,32 +128,16 @@ public void waitForComplete() {
    * @param runtimeFilterWritable
    */
   public void registerRuntimeFilter(RuntimeFilterWritable 
runtimeFilterWritable) {
-    BitData.RuntimeFilterBDef runtimeFilterB = 
runtimeFilterWritable.getRuntimeFilterBDef();
-    int majorId = runtimeFilterB.getMajorFragmentId();
-    UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
-    List<String> probeFields = runtimeFilterB.getProbeFieldsList();
-    logger.info("RuntimeFilterManager receives a runtime filter , majorId:{}, 
queryId:{}", majorId, QueryIdHelper.getQueryId(queryId));
-    int size;
-    synchronized (this) {
-      size = joinMjId2scanSize.get(majorId);
-      Preconditions.checkState(size > 0);
-      RuntimeFilterWritable aggregatedRuntimeFilter = 
joinMjId2AggregatedRF.get(majorId);
-      if (aggregatedRuntimeFilter == null) {
-        aggregatedRuntimeFilter = runtimeFilterWritable;
-      } else {
-        aggregatedRuntimeFilter.aggregate(runtimeFilterWritable);
-      }
-      joinMjId2AggregatedRF.put(majorId, aggregatedRuntimeFilter);
-      size--;
-      joinMjId2scanSize.put(majorId, size);
-    }
-    if (size == 0) {
-      broadcastAggregatedRuntimeFilter(majorId, queryId, probeFields);
-    }
+    broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
   }
 
 
-  private void broadcastAggregatedRuntimeFilter(int joinMajorId, 
UserBitShared.QueryId queryId, List<String> probeFields) {
+  private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable 
srcRuntimeFilterWritable) {
+    BitData.RuntimeFilterBDef runtimeFilterB = 
srcRuntimeFilterWritable.getRuntimeFilterBDef();
+    int joinMajorId = runtimeFilterB.getMajorFragmentId();
+    UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
+    List<String> probeFields = runtimeFilterB.getProbeFieldsList();
+    DrillBuf[] data = srcRuntimeFilterWritable.getData();
     List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = 
joinMjId2probdeScanEps.get(joinMajorId);
     int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
     for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
@@ -172,10 +150,8 @@ private void broadcastAggregatedRuntimeFilter(int 
joinMajorId, UserBitShared.Que
         .setMajorFragmentId(scanNodeMjId)
         .setMinorFragmentId(minorId)
         .build();
-      RuntimeFilterWritable aggregatedRuntimeFilter = 
joinMjId2AggregatedRF.get(joinMajorId);
-      RuntimeFilterWritable runtimeFilterWritable = new 
RuntimeFilterWritable(runtimeFilterBDef, aggregatedRuntimeFilter.getData());
+      RuntimeFilterWritable runtimeFilterWritable = new 
RuntimeFilterWritable(runtimeFilterBDef, data);
       CoordinationProtos.DrillbitEndpoint drillbitEndpoint = 
scanNodeEps.get(minorId);
-
       DataTunnel dataTunnel = 
drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
       Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
         @Override
@@ -235,8 +211,6 @@ public Void visitOp(PhysicalOperator op, RFHelperHolder 
holder) throws RuntimeEx
 
   private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor<Void, 
Void, RuntimeException> {
 
-    private PhysicalOperator targetOp;
-
     private Fragment fragment;
 
     private boolean contain = false;
@@ -251,7 +225,6 @@ public Void visitOp(PhysicalOperator op, RFHelperHolder 
holder) throws RuntimeEx
 
 
     public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment 
fragment) {
-      this.targetOp = targetOp;
       this.fragment = fragment;
       this.targetIsGroupScan = targetOp instanceof GroupScan;
       this.targetIsHashJoin = targetOp instanceof HashJoinPOP;
@@ -343,13 +316,10 @@ private Wrapper findPhysicalOpContainer(Wrapper wrapper, 
PhysicalOperator op) {
 
     private int probeSideScanMajorId;
 
-
-
     private List<CoordinationProtos.DrillbitEndpoint> probeSideScanEndpoints;
 
     private RuntimeFilterDef runtimeFilterDef;
 
-
     public List<CoordinationProtos.DrillbitEndpoint> 
getProbeSideScanEndpoints() {
       return probeSideScanEndpoints;
     }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
new file mode 100644
index 00000000000..14686254f93
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This sink receives the RuntimeFilters from the netty thread,
+ * aggregates them in an async thread, supplies the aggregated
+ * one to the fragment running thread.
+ */
+public class RuntimeFilterSink implements AutoCloseable {
+
+  private AtomicInteger currentBookId = new AtomicInteger(0);
+
+  private int staleBookId = 0;
+
+  /**
+   * RuntimeFilterWritable holding the aggregated version of all the received 
filter
+   */
+  private RuntimeFilterWritable aggregated = null;
+
+  private BlockingQueue<RuntimeFilterWritable> rfQueue = new 
LinkedBlockingQueue<>();
+
+  /**
+   * Flag used by Minor Fragment thread to indicate it has encountered error
+   */
+  private AtomicBoolean running = new AtomicBoolean(true);
+
+  /**
+   * Lock used to synchronize between producer (Netty Thread) and consumer 
(AsyncAggregateThread) of elements of this
+   * queue. This is needed because in error condition running flag can be 
consumed by producer and consumer thread at
+   * different times. Whoever sees it first will take this lock and clear all 
elements and set the queue to null to
+   * indicate producer not to put any new elements in it.
+   */
+  private ReentrantLock queueLock = new ReentrantLock();
+
+  private Condition notEmpty = queueLock.newCondition();
+
+  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+  private BufferAllocator bufferAllocator;
+
+  private Future future;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService 
executorService) {
+    this.bufferAllocator = bufferAllocator;
+    AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+    future = executorService.submit(asyncAggregateWorker);
+  }
+
+  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
+    if (running.get()) {
+      try {
+        aggregatedRFLock.lock();
+        if (containOne()) {
+          boolean same = aggregated.equals(runtimeFilterWritable);
+          if (!same) {
+            // This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
+            // share the same FragmentContext.
+            aggregated.close();
+            currentBookId.set(0);
+            staleBookId = 0;
+            clearQueued(false);
+          }
+        }
+      } finally {
+        aggregatedRFLock.unlock();
+      }
+
+      try {
+        queueLock.lock();
+        if (rfQueue != null) {
+          rfQueue.add(runtimeFilterWritable);
+          notEmpty.signal();
+        } else {
+          runtimeFilterWritable.close();
+        }
+      } finally {
+        queueLock.unlock();
+      }
+    } else {
+      runtimeFilterWritable.close();
+    }
+  }
+
+  public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() {
+    try {
+      aggregatedRFLock.lock();
+      return aggregated.duplicate(bufferAllocator);
+    } finally {
+      aggregatedRFLock.unlock();
+    }
+  }
+
+  /**
+   * whether there's a fresh aggregated RuntimeFilter
+   *
+   * @return
+   */
+  public boolean hasFreshOne() {
+    if (currentBookId.get() > staleBookId) {
+      staleBookId = currentBookId.get();
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * whether there's a usable RuntimeFilter.
+   *
+   * @return
+   */
+  public boolean containOne() {
+    return aggregated != null;
+  }
+
+  private void doCleanup() {
+    running.compareAndSet(true, false);
+    try {
+      aggregatedRFLock.lock();
+      if (containOne()) {
+        aggregated.close();
+        aggregated = null;
+      }
+    } finally {
+      aggregatedRFLock.unlock();
+    }
+  }
+
+  @Override
+  public void close() throws Exception {
+    future.cancel(true);
+    doCleanup();
+  }
+
+  private void clearQueued(boolean setToNull) {
+    RuntimeFilterWritable toClear;
+    try {
+      queueLock.lock();
+      while (rfQueue != null && (toClear = rfQueue.poll()) != null) {
+        toClear.close();
+      }
+      rfQueue = (setToNull) ? null : rfQueue;
+    } finally {
+      queueLock.unlock();
+    }
+  }
+
+  private class AsyncAggregateWorker implements Runnable {
+
+    @Override
+    public void run() {
+      try {
+        RuntimeFilterWritable toAggregate = null;
+        while (running.get()) {
+          try {
+            queueLock.lock();
+            toAggregate = (rfQueue != null) ? rfQueue.poll() :  null;
+            if (toAggregate == null) {
+              notEmpty.await();
+              continue;
+            }
+          } finally {
+            queueLock.unlock();
+          }
+
+          try {
+            aggregatedRFLock.lock();
+            if (containOne()) {
+              aggregated.aggregate(toAggregate);
+
+              // Release the byteBuf referenced by toAggregate since aggregate 
will not do it
+              toAggregate.close();
+            } else {
+              aggregated = toAggregate;
+            }
+          } finally {
+            aggregatedRFLock.unlock();
+          }
+          currentBookId.incrementAndGet();
+        }
+      } catch (InterruptedException e) {
+        logger.info("RFAggregating Thread : {} was interrupted.", 
Thread.currentThread().getName());
+        Thread.currentThread().interrupt();
+      } finally {
+        doCleanup();
+        clearQueued(true);
+      }
+    }
+  }
+}
+
+
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index 8649e15af1e..9a971e94cbc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -20,6 +20,7 @@
 
 import io.netty.buffer.DrillBuf;
 import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.BitData;
 
 import java.util.ArrayList;
@@ -29,15 +30,20 @@
  * A binary wire transferable representation of the RuntimeFilter which 
contains
  * the runtime filter definition and its corresponding data.
  */
-public class RuntimeFilterWritable implements AutoCloseables.Closeable {
+public class RuntimeFilterWritable implements AutoCloseables.Closeable{
 
   private BitData.RuntimeFilterBDef runtimeFilterBDef;
 
   private DrillBuf[] data;
 
+  private String identifier;
+
   public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, 
DrillBuf... data) {
     this.runtimeFilterBDef = runtimeFilterBDef;
     this.data = data;
+    this.identifier = "majorFragmentId:" + 
runtimeFilterBDef.getMajorFragmentId()
+      + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId()
+      + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
   }
 
 
@@ -81,11 +87,47 @@ public void aggregate(RuntimeFilterWritable 
runtimeFilterWritable) {
     }
   }
 
+  public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) {
+    int len = data.length;
+    DrillBuf[] cloned = new DrillBuf[len];
+    int i = 0;
+    for (DrillBuf src : data) {
+      int capacity = src.readableBytes();
+      DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
+      int readerIndex = src.readerIndex();
+      duplicateOne.writeBytes(src);
+      src.readerIndex(readerIndex);
+      cloned[i] = duplicateOne;
+      i++;
+    }
+    return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
+  }
+
+  public String toString() {
+    return identifier;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other instanceof RuntimeFilterWritable) {
+      RuntimeFilterWritable otherRFW = (RuntimeFilterWritable) other;
+      return this.identifier.equals(otherRFW.identifier);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return identifier.hashCode();
+  }
+
   @Override
   public void close() {
     for (DrillBuf buf : data) {
       buf.release();
     }
   }
-
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 634e8328cf5..42b76f278c7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.work.foreman;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterRouter;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
 import com.google.protobuf.InvalidProtocolBufferException;
@@ -61,7 +62,6 @@
 import org.apache.drill.exec.util.Pointer;
 import org.apache.drill.exec.work.QueryWorkUnit;
 import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.filter.RuntimeFilterManager;
 import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
 import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
 import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
@@ -122,7 +122,7 @@
 
   private String queryText;
 
-  private RuntimeFilterManager runtimeFilterManager;
+  private RuntimeFilterRouter runtimeFilterRouter;
   private boolean enableRuntimeFilter;
 
   /**
@@ -410,8 +410,8 @@ private void runPhysicalPlan(final PhysicalPlan plan, 
Pointer<String> textPlan)
     queryRM.visitAbstractPlan(plan);
     final QueryWorkUnit work = getQueryWorkUnit(plan);
     if (enableRuntimeFilter) {
-      runtimeFilterManager = new RuntimeFilterManager(work, drillbitContext);
-      runtimeFilterManager.collectRuntimeFilterParallelAndControlInfo();
+      runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext);
+      runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo();
     }
     if (textPlan != null) {
       queryManager.setPlanText(textPlan.value);
@@ -734,8 +734,8 @@ public void close() {
 
       logger.debug(queryIdString + ": cleaning up.");
       injector.injectPause(queryContext.getExecutionControls(), 
"foreman-cleanup", logger);
-      if (enableRuntimeFilter && runtimeFilterManager != null) {
-        runtimeFilterManager.waitForComplete();
+      if (enableRuntimeFilter && runtimeFilterRouter != null) {
+        runtimeFilterRouter.waitForComplete();
       }
       // remove the channel disconnected listener (doesn't throw)
       closeFuture.removeListener(closeListener);
@@ -866,8 +866,8 @@ public void interrupted(final InterruptedException e) {
   }
 
 
-  public RuntimeFilterManager getRuntimeFilterManager() {
-    return runtimeFilterManager;
+  public RuntimeFilterRouter getRuntimeFilterRouter() {
+    return runtimeFilterRouter;
   }
 
 }
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java 
b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index f867015725c..a1e7d0d2a41 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.test;
 
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.shaded.guava.com.google.common.base.Function;
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
 import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -80,6 +81,7 @@
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Test fixture for operator and (especially) "sub-operator" tests.
@@ -180,6 +182,7 @@ public OperatorFixture build() {
 
     private ExecutorState executorState = new 
OperatorFixture.MockExecutorState();
     private ExecutionControls controls;
+    private RuntimeFilterSink runtimeFilterSink;
 
     public MockFragmentContext(final DrillConfig config,
                                final OptionManager options,
@@ -195,6 +198,7 @@ public MockFragmentContext(final DrillConfig config,
       this.controls = new ExecutionControls(options);
       compiler = new CodeCompiler(config, options);
       bufferManager = new BufferManagerImpl(allocator);
+      this.runtimeFilterSink = new RuntimeFilterSink(allocator, 
Executors.newCachedThreadPool());
     }
 
     private static FunctionImplementationRegistry newFunctionRegistry(
@@ -315,13 +319,13 @@ public void close() {
     }
 
     @Override
-    public RuntimeFilterWritable getRuntimeFilter() {
-      return null;
+    public RuntimeFilterSink getRuntimeFilterSink() {
+      return runtimeFilterSink;
     }
 
     @Override
-    public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-
+    public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+      runtimeFilterSink.aggregate(runtimeFilter);
     }
 
     @Override
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
 
b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index 1c4779c3e7f..300e88bf243 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -38,6 +38,7 @@
 import org.apache.drill.exec.store.dfs.DrillFileSystem;
 import org.apache.drill.exec.store.easy.json.JSONRecordReader;
 import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
 import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -203,11 +204,12 @@ protected OperatorTestBuilder opTestBuilder() {
    * </p>
    */
   protected static class MockExecutorFragmentContext extends 
OperatorFixture.MockFragmentContext implements ExecutorFragmentContext {
-    private RuntimeFilterWritable runtimeFilterWritable;
+    private RuntimeFilterSink runtimeFilterSink;
 
     public MockExecutorFragmentContext(final FragmentContext fragmentContext) {
       super(fragmentContext.getConfig(), fragmentContext.getOptions(), 
fragmentContext.getAllocator(),
         fragmentContext.getScanExecutor(), 
fragmentContext.getScanDecodeExecutor());
+      this.runtimeFilterSink = new 
RuntimeFilterSink(fragmentContext.getAllocator(), 
Executors.newCachedThreadPool());
     }
 
     @Override
@@ -304,13 +306,13 @@ public boolean isUserAuthenticationEnabled() {
     }
 
     @Override
-    public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-      this.runtimeFilterWritable = runtimeFilter;
+    public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+      this.runtimeFilterSink.aggregate(runtimeFilter);
     }
 
     @Override
-    public RuntimeFilterWritable getRuntimeFilter() {
-      return runtimeFilterWritable;
+    public RuntimeFilterSink getRuntimeFilterSink() {
+      return runtimeFilterSink;
     }
   }
 
diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java 
b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
index 20abb3b62cb..d7921fc0f9f 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java
@@ -2518,6 +2518,24 @@ public Builder clearIsLastBatch() {
      */
     com.google.protobuf.ByteString
         getProbeFieldsBytes(int index);
+
+    // optional int32 hj_op_id = 7;
+    /**
+     * <code>optional int32 hj_op_id = 7;</code>
+     *
+     * <pre>
+     * the operator id of the HashJoin which generates this RuntimeFilter
+     * </pre>
+     */
+    boolean hasHjOpId();
+    /**
+     * <code>optional int32 hj_op_id = 7;</code>
+     *
+     * <pre>
+     * the operator id of the HashJoin which generates this RuntimeFilter
+     * </pre>
+     */
+    int getHjOpId();
   }
   /**
    * Protobuf type {@code exec.bit.data.RuntimeFilterBDef}
@@ -2627,6 +2645,11 @@ private RuntimeFilterBDef(
               probeFields_.add(input.readBytes());
               break;
             }
+            case 56: {
+              bitField0_ |= 0x00000010;
+              hjOpId_ = input.readInt32();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -2820,6 +2843,30 @@ public int getProbeFieldsCount() {
       return probeFields_.getByteString(index);
     }
 
+    // optional int32 hj_op_id = 7;
+    public static final int HJ_OP_ID_FIELD_NUMBER = 7;
+    private int hjOpId_;
+    /**
+     * <code>optional int32 hj_op_id = 7;</code>
+     *
+     * <pre>
+     * the operator id of the HashJoin which generates this RuntimeFilter
+     * </pre>
+     */
+    public boolean hasHjOpId() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional int32 hj_op_id = 7;</code>
+     *
+     * <pre>
+     * the operator id of the HashJoin which generates this RuntimeFilter
+     * </pre>
+     */
+    public int getHjOpId() {
+      return hjOpId_;
+    }
+
     private void initFields() {
       queryId_ = 
org.apache.drill.exec.proto.UserBitShared.QueryId.getDefaultInstance();
       majorFragmentId_ = 0;
@@ -2827,6 +2874,7 @@ private void initFields() {
       toForeman_ = false;
       bloomFilterSizeInBytes_ = java.util.Collections.emptyList();
       probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+      hjOpId_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -2858,6 +2906,9 @@ public void writeTo(com.google.protobuf.CodedOutputStream 
output)
       for (int i = 0; i < probeFields_.size(); i++) {
         output.writeBytes(6, probeFields_.getByteString(i));
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeInt32(7, hjOpId_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -2901,6 +2952,10 @@ public int getSerializedSize() {
         size += dataSize;
         size += 1 * getProbeFieldsList().size();
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(7, hjOpId_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -3034,6 +3089,8 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x00000010);
         probeFields_ = com.google.protobuf.LazyStringArrayList.EMPTY;
         bitField0_ = (bitField0_ & ~0x00000020);
+        hjOpId_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
 
@@ -3093,6 +3150,10 @@ public Builder clone() {
           bitField0_ = (bitField0_ & ~0x00000020);
         }
         result.probeFields_ = probeFields_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.hjOpId_ = hjOpId_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -3141,6 +3202,9 @@ public Builder 
mergeFrom(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef o
           }
           onChanged();
         }
+        if (other.hasHjOpId()) {
+          setHjOpId(other.getHjOpId());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -3595,6 +3659,55 @@ public Builder addProbeFieldsBytes(
         return this;
       }
 
+      // optional int32 hj_op_id = 7;
+      private int hjOpId_ ;
+      /**
+       * <code>optional int32 hj_op_id = 7;</code>
+       *
+       * <pre>
+       * the operator id of the HashJoin which generates this RuntimeFilter
+       * </pre>
+       */
+      public boolean hasHjOpId() {
+        return ((bitField0_ & 0x00000040) == 0x00000040);
+      }
+      /**
+       * <code>optional int32 hj_op_id = 7;</code>
+       *
+       * <pre>
+       * the operator id of the HashJoin which generates this RuntimeFilter
+       * </pre>
+       */
+      public int getHjOpId() {
+        return hjOpId_;
+      }
+      /**
+       * <code>optional int32 hj_op_id = 7;</code>
+       *
+       * <pre>
+       * the operator id of the HashJoin which generates this RuntimeFilter
+       * </pre>
+       */
+      public Builder setHjOpId(int value) {
+        bitField0_ |= 0x00000040;
+        hjOpId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional int32 hj_op_id = 7;</code>
+       *
+       * <pre>
+       * the operator id of the HashJoin which generates this RuntimeFilter
+       * </pre>
+       */
+      public Builder clearHjOpId() {
+        bitField0_ = (bitField0_ & ~0x00000040);
+        hjOpId_ = 0;
+        onChanged();
+        return this;
+      }
+
       // 
@@protoc_insertion_point(builder_scope:exec.bit.data.RuntimeFilterBDef)
     }
 
@@ -3648,16 +3761,16 @@ public Builder addProbeFieldsBytes(
       " \003(\005\022!\n\031sending_major_fragment_id\030\004 \001(\005\022" +
       "!\n\031sending_minor_fragment_id\030\005 \001(\005\022(\n\003de" +
       "f\030\006 \001(\0132\033.exec.shared.RecordBatchDef\022\023\n\013" +
-      "isLastBatch\030\007 \001(\010\"\277\001\n\021RuntimeFilterBDef\022" +
+      "isLastBatch\030\007 \001(\010\"\321\001\n\021RuntimeFilterBDef\022" +
       "&\n\010query_id\030\001 \001(\0132\024.exec.shared.QueryId\022" +
       "\031\n\021major_fragment_id\030\002 \001(\005\022\031\n\021minor_frag" +
       "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 
\001(\010\022\"\n\032blo" +
       "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" +
-      "ields\030\006 
\003(\t*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n" +
-      
"\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH\020",
-      "\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILTE" +
-      "R\020\005B(\n\033org.apache.drill.exec.protoB\007BitD" +
-      "ataH\001"
+      "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 
\001(\005*n\n\007RpcType" +
+      
"\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n",
+      
"\020REQ_RECORD_BATCH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n" +
+      "\022REQ_RUNTIME_FILTER\020\005B(\n\033org.apache.dril" +
+      "l.exec.protoB\007BitDataH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner 
assigner =
       new 
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -3687,7 +3800,7 @@ public Builder addProbeFieldsBytes(
           internal_static_exec_bit_data_RuntimeFilterBDef_fieldAccessorTable = 
new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_exec_bit_data_RuntimeFilterBDef_descriptor,
-              new java.lang.String[] { "QueryId", "MajorFragmentId", 
"MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", });
+              new java.lang.String[] { "QueryId", "MajorFragmentId", 
"MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", 
"HjOpId", });
           return null;
         }
       };
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java 
b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
index 6aa54dd9551..3c88ffcedbd 100644
--- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
+++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java
@@ -441,6 +441,8 @@ public void writeTo(com.dyuproject.protostuff.Output 
output, org.apache.drill.ex
                     output.writeInt32(5, bloomFilterSizeInBytes, true);
                 for(String probeFields : message.getProbeFieldsList())
                     output.writeString(6, probeFields, true);
+                if(message.hasHjOpId())
+                    output.writeInt32(7, message.getHjOpId(), false);
             }
             public boolean 
isInitialized(org.apache.drill.exec.proto.BitData.RuntimeFilterBDef message)
             {
@@ -499,6 +501,9 @@ public void mergeFrom(com.dyuproject.protostuff.Input 
input, org.apache.drill.ex
                         case 6:
                             builder.addProbeFields(input.readString());
                             break;
+                        case 7:
+                            builder.setHjOpId(input.readInt32());
+                            break;
                         default:
                             input.handleUnknownField(number, this);
                     }
@@ -545,6 +550,7 @@ public void writeTo(com.dyuproject.protostuff.Output 
output, org.apache.drill.ex
                 case 4: return "toForeman";
                 case 5: return "bloomFilterSizeInBytes";
                 case 6: return "probeFields";
+                case 7: return "hjOpId";
                 default: return null;
             }
         }
@@ -562,6 +568,7 @@ public static int getFieldNumber(java.lang.String name)
             fieldMap.put("toForeman", 4);
             fieldMap.put("bloomFilterSizeInBytes", 5);
             fieldMap.put("probeFields", 6);
+            fieldMap.put("hjOpId", 7);
         }
     }
 
diff --git 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
index 99f3c78db76..2d1c2a70253 100644
--- 
a/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
+++ 
b/protocol/src/main/java/org/apache/drill/exec/proto/beans/RuntimeFilterBDef.java
@@ -55,6 +55,7 @@ public static RuntimeFilterBDef getDefaultInstance()
     private Boolean toForeman;
     private List<Integer> bloomFilterSizeInBytes;
     private List<String> probeFields;
+    private int hjOpId;
 
     public RuntimeFilterBDef()
     {
@@ -141,6 +142,19 @@ public RuntimeFilterBDef setProbeFieldsList(List<String> 
probeFields)
         return this;
     }
 
+    // hjOpId
+
+    public int getHjOpId()
+    {
+        return hjOpId;
+    }
+
+    public RuntimeFilterBDef setHjOpId(int hjOpId)
+    {
+        this.hjOpId = hjOpId;
+        return this;
+    }
+
     // java serialization
 
     public void readExternal(ObjectInput in) throws IOException
@@ -218,6 +232,9 @@ public void mergeFrom(Input input, RuntimeFilterBDef 
message) throws IOException
                         message.probeFields = new ArrayList<String>();
                     message.probeFields.add(input.readString());
                     break;
+                case 7:
+                    message.hjOpId = input.readInt32();
+                    break;
                 default:
                     input.handleUnknownField(number, this);
             }   
@@ -257,6 +274,9 @@ public void writeTo(Output output, RuntimeFilterBDef 
message) throws IOException
                     output.writeString(6, probeFields, true);
             }
         }
+
+        if(message.hjOpId != 0)
+            output.writeInt32(7, message.hjOpId, false);
     }
 
     public String getFieldName(int number)
@@ -269,6 +289,7 @@ public String getFieldName(int number)
             case 4: return "toForeman";
             case 5: return "bloomFilterSizeInBytes";
             case 6: return "probeFields";
+            case 7: return "hjOpId";
             default: return null;
         }
     }
@@ -288,6 +309,7 @@ public int getFieldNumber(String name)
         __fieldMap.put("toForeman", 4);
         __fieldMap.put("bloomFilterSizeInBytes", 5);
         __fieldMap.put("probeFields", 6);
+        __fieldMap.put("hjOpId", 7);
     }
     
 }
diff --git a/protocol/src/main/protobuf/BitData.proto 
b/protocol/src/main/protobuf/BitData.proto
index 063efe45851..15c72308ee2 100644
--- a/protocol/src/main/protobuf/BitData.proto
+++ b/protocol/src/main/protobuf/BitData.proto
@@ -47,4 +47,5 @@ message RuntimeFilterBDef{
   optional bool to_foreman = 4; // true means sending to foreman,false means 
sending to scan nodes
   repeated int32 bloom_filter_size_in_bytes = 5;
   repeated string probe_fields = 6; // probe fields with corresponding 
BloomFilters
+  optional int32 hj_op_id = 7; // the operator id of the HashJoin which 
generates this RuntimeFilter
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JPPD:Move aggregating the BF from the Foreman to the RuntimeFilter
> ------------------------------------------------------------------
>
>                 Key: DRILL-6731
>                 URL: https://issues.apache.org/jira/browse/DRILL-6731
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components:  Server
>    Affects Versions: 1.15.0
>            Reporter: weijie.tong
>            Assignee: weijie.tong
>            Priority: Major
>              Labels: ready-to-commit
>             Fix For: 1.15.0
>
>
> This PR is to move the BloomFilter aggregating work from the foreman to 
> RuntimeFilter. Though this change, the RuntimeFilter can apply the incoming 
> BF as soon as possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to