weijietong commented on a change in pull request #1459: DRILL-6731: Move the 
BFs aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r221171069
 
 

 ##########
 File path: 
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
 ##########
 @@ -36,25 +40,63 @@
 
   private RuntimeFilterWritable aggregated = null;
 
-  private Queue<RuntimeFilterWritable> rfQueue = new ConcurrentLinkedQueue<>();
+  private BlockingQueue<RuntimeFilterWritable> rfQueue = new 
LinkedBlockingQueue<>();
 
   private AtomicBoolean running = new AtomicBoolean(true);
 
+  private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+  private Thread asyncAggregateThread;
+
+  private BufferAllocator bufferAllocator;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+  public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+    this.bufferAllocator = bufferAllocator;
+    AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+    asyncAggregateThread = new 
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
+    asyncAggregateThread.start();
+  }
+
   public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
-    rfQueue.add(runtimeFilterWritable);
-    if (currentBookId.get() == 0) {
-      AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
-      Thread asyncAggregateThread = new 
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
-      asyncAggregateThread.start();
+    if (running.get()) {
+      if (containOne()) {
+        boolean same = aggregated.same(runtimeFilterWritable);
+        if (!same) {
+          //This is to solve the only one fragment case that two 
RuntimeFilterRecordBatchs
+          //share the same FragmentContext.
 
 Review comment:
   The directly example is this TPC-H sql:
   ```
   select l.l_orderkey, sum(l.l_extendedprice * (1 - l.l_discount)) as revenue, 
o.o_orderdate, o.o_shippriority  
   from dfs.`/tpch-parquet/customer` c, dfs.`/tpch-parquet/orders` o, 
dfs.`/tpch-parquet/lineitem` l  
   where c.c_mktsegment = 'HOUSEHOLD' and c.c_custkey = o.o_custkey and 
l.l_orderkey = o.o_orderkey and o.o_orderdate < date '1995-03-25' and 
l.l_shipdate > date '1995-03-25'  
   group by l.l_orderkey, o.o_orderdate, o.o_shippriority 
   order by revenue desc, o.o_orderdate limit 10
   ```
   The corresponding plan is:
   ```
   
   00-00    Screen : rowType = RecordType(ANY l_orderkey, ANY revenue, ANY 
o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative cost = 
{4051714.1799999997 rows, 3.094535517999211E7 cpu, 0.0 io, 0.0 network, 
1.2986673920000002E7 memory}, id = 3423
   00-01      Project(l_orderkey=[$0], revenue=[$1], o_orderdate=[$2], 
o_shippriority=[$3]) : rowType = RecordType(ANY l_orderkey, ANY revenue, ANY 
o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative cost = 
{4051713.1799999997 rows, 3.094535417999211E7 cpu, 0.0 io, 0.0 network, 
1.2986673920000002E7 memory}, id = 3422
   00-02        SelectionVectorRemover : rowType = RecordType(ANY l_orderkey, 
ANY revenue, ANY o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative 
cost = {4051703.1799999997 rows, 3.094531417999211E7 cpu, 0.0 io, 0.0 network, 
1.2986673920000002E7 memory}, id = 3421
   00-03          Limit(fetch=[10]) : rowType = RecordType(ANY l_orderkey, ANY 
revenue, ANY o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative cost 
= {4051693.1799999997 rows, 3.094530417999211E7 cpu, 0.0 io, 0.0 network, 
1.2986673920000002E7 memory}, id = 3420
   00-04            SelectionVectorRemover : rowType = RecordType(ANY 
l_orderkey, ANY revenue, ANY o_orderdate, ANY o_shippriority): rowcount = 
3002.8599999999997, cumulative cost = {4051683.1799999997 rows, 
3.094526417999211E7 cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id 
= 3419
   00-05              TopN(limit=[10]) : rowType = RecordType(ANY l_orderkey, 
ANY revenue, ANY o_orderdate, ANY o_shippriority): rowcount = 
3002.8599999999997, cumulative cost = {4048680.32 rows, 3.094226131999211E7 
cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id = 3418
   00-06                Project(l_orderkey=[$0], revenue=[$3], 
o_orderdate=[$1], o_shippriority=[$2]) : rowType = RecordType(ANY l_orderkey, 
ANY revenue, ANY o_orderdate, ANY o_shippriority): rowcount = 
3002.8599999999997, cumulative cost = {4045677.46 rows, 3.0862459040000003E7 
cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id = 3417
   00-07                  HashAgg(group=[{0, 1, 2}], revenue=[SUM($3)]) : 
rowType = RecordType(ANY l_orderkey, ANY o_orderdate, ANY o_shippriority, ANY 
revenue): rowcount = 3002.8599999999997, cumulative cost = {4042674.6 rows, 
3.08504476E7 cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id = 3416
   00-08                    HashAgg(group=[{0, 1, 2}], revenue=[SUM($3)]) : 
rowType = RecordType(ANY l_orderkey, ANY o_orderdate, ANY o_shippriority, ANY 
revenue): rowcount = 30028.6, cumulative cost = {4012646.0 rows, 2.9769418E7 
cpu, 0.0 io, 0.0 network, 1.1929667200000001E7 memory}, id = 3415
   00-09                      Project(l_orderkey=[$6], o_orderdate=[$4], 
o_shippriority=[$5], $f3=[*($8, -(1, $9))]) : rowType = RecordType(ANY 
l_orderkey, ANY o_orderdate, ANY o_shippriority, ANY $f3): rowcount = 300286.0, 
cumulative cost = {3712360.0 rows, 1.8959122E7 cpu, 0.0 io, 0.0 network, 
1359600.0 memory}, id = 3414
   00-10                        Project(c_mktsegment=[$8], c_custkey=[$9], 
o_custkey=[$4], o_orderkey=[$5], o_orderdate=[$6], o_shippriority=[$7], 
l_orderkey=[$0], l_shipdate=[$1], l_extendedprice=[$2], l_discount=[$3]) : 
rowType = RecordType(ANY c_mktsegment, ANY c_custkey, ANY o_custkey, ANY 
o_orderkey, ANY o_orderdate, ANY o_shippriority, ANY l_orderkey, ANY 
l_shipdate, ANY l_extendedprice, ANY l_discount): rowcount = 300286.0, 
cumulative cost = {3412074.0 rows, 1.685712E7 cpu, 0.0 io, 0.0 network, 
1359600.0 memory}, id = 3413
   00-11                          HashJoin(condition=[=($5, $0)], 
joinType=[inner]) : rowType = RecordType(ANY l_orderkey, ANY l_shipdate, ANY 
l_extendedprice, ANY l_discount, ANY o_custkey, ANY o_orderkey, ANY 
o_orderdate, ANY o_shippriority, ANY c_mktsegment, ANY c_custkey): rowcount = 
300286.0, cumulative cost = {3111788.0 rows, 1.385426E7 cpu, 0.0 io, 0.0 
network, 1359600.0 memory}, id = 3412
   00-13                            SelectionVectorRemover : rowType = 
RecordType(ANY l_orderkey, ANY l_shipdate, ANY l_extendedprice, ANY 
l_discount): rowcount = 300286.0, cumulative cost = {2102002.0 rows, 6906578.0 
cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3403
   00-16                              Filter(condition=[>($1, 1995-03-25)]) : 
rowType = RecordType(ANY l_orderkey, ANY l_shipdate, ANY l_extendedprice, ANY 
l_discount): rowcount = 300286.0, cumulative cost = {1801716.0 rows, 6606292.0 
cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3402
   00-19                                RuntimeFilter : rowType = 
RecordType(ANY l_orderkey, ANY l_shipdate, ANY l_extendedprice, ANY 
l_discount): rowcount = 600572.0, cumulative cost = {1201144.0 rows, 3002860.0 
cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3401
   00-22                                  Scan(groupscan=[ParquetGroupScan 
[entries=[ReadEntryWithPath 
[path=hdfs://10.210.214.42:8020/tpch-parquet/lineitem]], 
selectionRoot=hdfs://10.210.214.42:8020/tpch-parquet/lineitem, numFiles=1, 
numRowGroups=1, usedMetadataFile=false, columns=[`l_orderkey`, `l_shipdate`, 
`l_extendedprice`, `l_discount`]]]) : rowType = RecordType(ANY l_orderkey, ANY 
l_shipdate, ANY l_extendedprice, ANY l_discount): rowcount = 600572.0, 
cumulative cost = {600572.0 rows, 2402288.0 cpu, 0.0 io, 0.0 network, 0.0 
memory}, id = 3400
   00-12                            HashJoin(condition=[=($5, $0)], 
joinType=[inner]) : rowType = RecordType(ANY o_custkey, ANY o_orderkey, ANY 
o_orderdate, ANY o_shippriority, ANY c_mktsegment, ANY c_custkey): rowcount = 
75000.0, cumulative cost = {634500.0 rows, 2744250.0 cpu, 0.0 io, 0.0 network, 
39600.0 memory}, id = 3411
   00-15                              SelectionVectorRemover : rowType = 
RecordType(ANY o_custkey, ANY o_orderkey, ANY o_orderdate, ANY o_shippriority): 
rowcount = 75000.0, cumulative cost = {525000.0 rows, 1725000.0 cpu, 0.0 io, 
0.0 network, 0.0 memory}, id = 3407
   00-18                                Filter(condition=[<($2, 1995-03-25)]) : 
rowType = RecordType(ANY o_custkey, ANY o_orderkey, ANY o_orderdate, ANY 
o_shippriority): rowcount = 75000.0, cumulative cost = {450000.0 rows, 
1650000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3406
   00-21                                  RuntimeFilter : rowType = 
RecordType(ANY o_custkey, ANY o_orderkey, ANY o_orderdate, ANY o_shippriority): 
rowcount = 150000.0, cumulative cost = {300000.0 rows, 750000.0 cpu, 0.0 io, 
0.0 network, 0.0 memory}, id = 3405
   00-23                                    Scan(groupscan=[ParquetGroupScan 
[entries=[ReadEntryWithPath 
[path=hdfs://10.210.214.42:8020/tpch-parquet/orders]], 
selectionRoot=hdfs://10.210.214.42:8020/tpch-parquet/orders, numFiles=1, 
numRowGroups=1, usedMetadataFile=false, columns=[`o_custkey`, `o_orderkey`, 
`o_orderdate`, `o_shippriority`]]]) : rowType = RecordType(ANY o_custkey, ANY 
o_orderkey, ANY o_orderdate, ANY o_shippriority): rowcount = 150000.0, 
cumulative cost = {150000.0 rows, 600000.0 cpu, 0.0 io, 0.0 network, 0.0 
memory}, id = 3404
   00-14                              SelectionVectorRemover : rowType = 
RecordType(ANY c_mktsegment, ANY c_custkey): rowcount = 2250.0, cumulative cost 
= {32250.0 rows, 101250.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3410
   00-17                                Filter(condition=[=($0, 'HOUSEHOLD')]) 
: rowType = RecordType(ANY c_mktsegment, ANY c_custkey): rowcount = 2250.0, 
cumulative cost = {30000.0 rows, 99000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, 
id = 3409
   00-20                                  Scan(groupscan=[ParquetGroupScan 
[entries=[ReadEntryWithPath 
[path=hdfs://10.210.214.42:8020/tpch-parquet/customer]], 
selectionRoot=hdfs://10.210.214.42:8020/tpch-parquet/customer, numFiles=1, 
numRowGroups=1, usedMetadataFile=false, columns=[`c_mktsegment`, 
`c_custkey`]]]) : rowType  = RecordType(ANY c_mktsegment, ANY c_custkey): 
rowcount = 15000.0, cumulative cost = {15000.0 rows, 30000.0 cpu, 0.0 io, 0.0 
network, 0.0 memory}, id = 3408
   ```
   
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to