[ 
https://issues.apache.org/jira/browse/DRILL-6792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16686070#comment-16686070
 ] 

ASF GitHub Bot commented on DRILL-6792:
---------------------------------------

weijietong commented on a change in pull request #1504: DRILL-6792: Find the 
right probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r233307957
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##########
 @@ -17,206 +17,218 @@
  */
 package org.apache.drill.exec.work.filter;
 
-import org.apache.drill.exec.memory.BufferAllocator;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * This sink receives the RuntimeFilters from the netty thread,
- * aggregates them in an async thread, supplies the aggregated
- * one to the fragment running thread.
+ * aggregates them in an async thread, broadcast the final aggregated
+ * one to the RuntimeFilterRecordBatch.
  */
-public class RuntimeFilterSink implements AutoCloseable {
-
-  private AtomicInteger currentBookId = new AtomicInteger(0);
-
-  private int staleBookId = 0;
-
-  /**
-   * RuntimeFilterWritable holding the aggregated version of all the received 
filter
-   */
-  private RuntimeFilterWritable aggregated = null;
+public class RuntimeFilterSink
+{
 
   private BlockingQueue<RuntimeFilterWritable> rfQueue = new 
LinkedBlockingQueue<>();
 
-  /**
-   * Flag used by Minor Fragment thread to indicate it has encountered error
-   */
-  private AtomicBoolean running = new AtomicBoolean(true);
-
-  /**
-   * Lock used to synchronize between producer (Netty Thread) and consumer 
(AsyncAggregateThread) of elements of this
-   * queue. This is needed because in error condition running flag can be 
consumed by producer and consumer thread at
-   * different times. Whoever sees it first will take this lock and clear all 
elements and set the queue to null to
-   * indicate producer not to put any new elements in it.
-   */
   private ReentrantLock queueLock = new ReentrantLock();
 
   private Condition notEmpty = queueLock.newCondition();
 
-  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+  private Map<Integer, Integer> joinMjId2rfNumber;
+
+  //HashJoin node's major fragment id to its corresponding probe side nodes's 
endpoints
+  private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> 
joinMjId2probeScanEps = new HashMap<>();
+
+  //HashJoin node's major fragment id to its corresponding probe side scan 
node's belonging major fragment id
+  private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
 
-  private BufferAllocator bufferAllocator;
+  //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+  private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new 
HashMap<>();
+  //for debug usage
+  private Map<Integer, Stopwatch> joinMjId2Stopwatch = new HashMap<>();
 
-  private Future future;
+  private DrillbitContext drillbitContext;
+
+  private SendingAccountor sendingAccountor;
 
   private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
 
 
-  public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService 
executorService) {
-    this.bufferAllocator = bufferAllocator;
+  public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor 
sendingAccountor)
+  {
+    this.drillbitContext = drillbitContext;
+    this.sendingAccountor = sendingAccountor;
     AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-    future = executorService.submit(asyncAggregateWorker);
+    drillbitContext.getExecutor().submit(asyncAggregateWorker);
   }
 
-  public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-    if (running.get()) {
-      try {
-        aggregatedRFLock.lock();
-        if (containOne()) {
-          boolean same = aggregated.equals(runtimeFilterWritable);
-          if (!same) {
-            // This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
-            // share the same FragmentContext.
-            aggregated.close();
-            currentBookId.set(0);
-            staleBookId = 0;
-            clearQueued(false);
-          }
-        }
-      } finally {
-        aggregatedRFLock.unlock();
-      }
-
-      try {
-        queueLock.lock();
-        if (rfQueue != null) {
-          rfQueue.add(runtimeFilterWritable);
-          notEmpty.signal();
-        } else {
-          runtimeFilterWritable.close();
-        }
-      } finally {
-        queueLock.unlock();
-      }
-    } else {
-      runtimeFilterWritable.close();
+  public void add(RuntimeFilterWritable runtimeFilterWritable)
+  {
+    runtimeFilterWritable.retainBuffers(1);
+    int joinMjId = 
runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId();
+    if (joinMjId2Stopwatch.get(joinMjId) == null) {
+      Stopwatch stopwatch = Stopwatch.createStarted();
+      joinMjId2Stopwatch.put(joinMjId, stopwatch);
     }
-  }
-
-  public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() {
+    queueLock.lock();
     try {
-      aggregatedRFLock.lock();
-      return aggregated.duplicate(bufferAllocator);
-    } finally {
-      aggregatedRFLock.unlock();
+      rfQueue.add(runtimeFilterWritable);
+      notEmpty.signal();
     }
-  }
 
 Review comment:
   Not seen any performance improved, but increased the lock block size by 
involving the aggregating heavy work at the the lock range.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Find the right probe side fragment to any storage plugin
> --------------------------------------------------------
>
>                 Key: DRILL-6792
>                 URL: https://issues.apache.org/jira/browse/DRILL-6792
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Flow
>            Reporter: weijie.tong
>            Assignee: weijie.tong
>            Priority: Major
>             Fix For: 1.15.0
>
>
> The current implementation of JPPD to find the probe side wrapper depends on 
> the GroupScan's digest. But there's no promise the GroupScan's digest will 
> not be changed since it is attached to the RuntimeFilterDef by different 
> storage plugin implementation logic.So here we assign a unique identifier to 
> the RuntimeFilter operator, and find the right probe side fragment wrapper by 
> the runtime filter identifier at the RuntimeFilterRouter class. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to