[ 
https://issues.apache.org/jira/browse/DRILL-6792?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16685888#comment-16685888
 ] 

ASF GitHub Bot commented on DRILL-6792:
---------------------------------------

sohami commented on a change in pull request #1504: DRILL-6792: Find the right 
probe side fragment wrapper & fix DrillBuf…
URL: https://github.com/apache/drill/pull/1504#discussion_r233207751
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
 ##########
 @@ -136,8 +143,8 @@ public void interrupt(final InterruptedException e) {
   private final AccountingUserConnection accountingUserConnection;
   /** Stores constants and their holders by type */
   private final Map<String, Map<MinorType, ValueHolder>> 
constantValueHolderCache;
-
-  private RuntimeFilterSink runtimeFilterSink;
+  private Map<Long, RuntimeFilterWritable> rfIdentifier2RFW = new 
ConcurrentHashMap<>();
+  private Map<Long, Boolean> rfIdentifier2fetched = new ConcurrentHashMap<>();
 
 Review comment:
   The extra decrease is because of this extra 
[retainBuffers](https://github.com/apache/drill/pull/1504/files#diff-11d71582cb7541a6ace7d9a1d7072c40R374)
 call. I think we can get rid of that `retainBuffer` call and then we don't 
have to call `release` twice for unconsumed RuntimeFilterWritable. Consider 
below flows:
   
   - `RuntimeFilterWritable` is received from Netty via `receiveRuntimeFilter` 
(ref count 1)
   - `retainBuffers` is called in receiveRuntimeFilter (ref count 2)
   - `AddRuntimeFilter` is called. Let's not increment the refCount here and 
just put the filter in the map `rfIdentifier2RFW` (so ref count is still 2)
   - Netty thread will call release (ref count 1)
   - Now there can be 2 cases:
   -- If filter is consumed by RTF operator then on close ref count will be 
decreased (ref count 0)
          For the consumed key RuntimeFilterWritable will be removed from map 
`rfIdentifier2RFW`
   -- If filter is not consumed then it will still be present in the map and 
`closeReceivedRFWs` will release the buffer (ref count 0)
   
   In another case:
   - `RuntimeFilterWritable` buffers are created by HashJoin operator (ref 
count 1)
   - `addRuntimeFilter` is called by HashJoin operator thread if sendToForeman 
is set to false. (ref count 1)
   -  Now there can be 2 cases:
   -- If filter is consumed by RTF operator then on close ref count will be 
decreased (ref count 0)
          For the consumed key RuntimeFilterWritable will be removed from map 
`rfIdentifier2RFW`
   -- If filter is not consumed then it will still be present in the map and 
`closeReceivedRFWs` will release the buffer (ref count 0)
   
   Based on above we can also get rid of `rfIdentifier2fetched` but has to be 
extra careful when looking for RuntimeFilterWritable in `rfIdentifier2RFW`. We 
have to call `containsKey()` to check first in `getRuntimeFilter` rather than 
`get` on `rfIdentifier2RFW`. Since the map allows null values get call can 
create a new key with null values. OR just implement `closeReceivedRFWs` to 
have check for null values.
   
   ```
   private void closeReceivedRFWs() {
       for (RuntimeFilterWritable runtimeFilterWritable : 
rfIdentifier2RFW.values()){
          if (runtimeFilterWritable == null) {
               continue;
          }
          long rfIdentifier = 
runtimeFilterWritable.getRuntimeFilterBDef().getRfIdentifier();
          runtimeFilterWritable.close();
       }
    }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Find the right probe side fragment to any storage plugin
> --------------------------------------------------------
>
>                 Key: DRILL-6792
>                 URL: https://issues.apache.org/jira/browse/DRILL-6792
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Flow
>            Reporter: weijie.tong
>            Assignee: weijie.tong
>            Priority: Major
>             Fix For: 1.15.0
>
>
> The current implementation of JPPD to find the probe side wrapper depends on 
> the GroupScan's digest. But there's no promise the GroupScan's digest will 
> not be changed since it is attached to the RuntimeFilterDef by different 
> storage plugin implementation logic.So here we assign a unique identifier to 
> the RuntimeFilter operator, and find the right probe side fragment wrapper by 
> the runtime filter identifier at the RuntimeFilterRouter class. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to