weijietong commented on a change in pull request #1459: DRILL-6731: Move the
BFs aggregating work from the Foreman to the RuntimeFi…
URL: https://github.com/apache/drill/pull/1459#discussion_r221171069
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
##########
@@ -36,25 +40,63 @@
private RuntimeFilterWritable aggregated = null;
- private Queue<RuntimeFilterWritable> rfQueue = new ConcurrentLinkedQueue<>();
+ private BlockingQueue<RuntimeFilterWritable> rfQueue = new
LinkedBlockingQueue<>();
private AtomicBoolean running = new AtomicBoolean(true);
+ private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+ private Thread asyncAggregateThread;
+
+ private BufferAllocator bufferAllocator;
+
+ private static final Logger logger =
LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+ public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+ this.bufferAllocator = bufferAllocator;
+ AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+ asyncAggregateThread = new
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
+ asyncAggregateThread.start();
+ }
+
public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
- rfQueue.add(runtimeFilterWritable);
- if (currentBookId.get() == 0) {
- AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
- Thread asyncAggregateThread = new
NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
- asyncAggregateThread.start();
+ if (running.get()) {
+ if (containOne()) {
+ boolean same = aggregated.same(runtimeFilterWritable);
+ if (!same) {
+ //This is to solve the only one fragment case that two
RuntimeFilterRecordBatchs
+ //share the same FragmentContext.
Review comment:
The directly example is this TPC-H sql:
```
select l.l_orderkey, sum(l.l_extendedprice * (1 - l.l_discount)) as revenue,
o.o_orderdate, o.o_shippriority
from dfs.`/tpch-parquet/customer` c, dfs.`/tpch-parquet/orders` o,
dfs.`/tpch-parquet/lineitem` l
where c.c_mktsegment = 'HOUSEHOLD' and c.c_custkey = o.o_custkey and
l.l_orderkey = o.o_orderkey and o.o_orderdate < date '1995-03-25' and
l.l_shipdate > date '1995-03-25'
group by l.l_orderkey, o.o_orderdate, o.o_shippriority
order by revenue desc, o.o_orderdate limit 10
```
The corresponding plan is:
```
00-00 Screen : rowType = RecordType(ANY l_orderkey, ANY revenue, ANY
o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative cost =
{4051714.1799999997 rows, 3.094535517999211E7 cpu, 0.0 io, 0.0 network,
1.2986673920000002E7 memory}, id = 3423
00-01 Project(l_orderkey=[$0], revenue=[$1], o_orderdate=[$2],
o_shippriority=[$3]) : rowType = RecordType(ANY l_orderkey, ANY revenue, ANY
o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative cost =
{4051713.1799999997 rows, 3.094535417999211E7 cpu, 0.0 io, 0.0 network,
1.2986673920000002E7 memory}, id = 3422
00-02 SelectionVectorRemover : rowType = RecordType(ANY l_orderkey,
ANY revenue, ANY o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative
cost = {4051703.1799999997 rows, 3.094531417999211E7 cpu, 0.0 io, 0.0 network,
1.2986673920000002E7 memory}, id = 3421
00-03 Limit(fetch=[10]) : rowType = RecordType(ANY l_orderkey, ANY
revenue, ANY o_orderdate, ANY o_shippriority): rowcount = 10.0, cumulative cost
= {4051693.1799999997 rows, 3.094530417999211E7 cpu, 0.0 io, 0.0 network,
1.2986673920000002E7 memory}, id = 3420
00-04 SelectionVectorRemover : rowType = RecordType(ANY
l_orderkey, ANY revenue, ANY o_orderdate, ANY o_shippriority): rowcount =
3002.8599999999997, cumulative cost = {4051683.1799999997 rows,
3.094526417999211E7 cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id
= 3419
00-05 TopN(limit=[10]) : rowType = RecordType(ANY l_orderkey,
ANY revenue, ANY o_orderdate, ANY o_shippriority): rowcount =
3002.8599999999997, cumulative cost = {4048680.32 rows, 3.094226131999211E7
cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id = 3418
00-06 Project(l_orderkey=[$0], revenue=[$3],
o_orderdate=[$1], o_shippriority=[$2]) : rowType = RecordType(ANY l_orderkey,
ANY revenue, ANY o_orderdate, ANY o_shippriority): rowcount =
3002.8599999999997, cumulative cost = {4045677.46 rows, 3.0862459040000003E7
cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id = 3417
00-07 HashAgg(group=[{0, 1, 2}], revenue=[SUM($3)]) :
rowType = RecordType(ANY l_orderkey, ANY o_orderdate, ANY o_shippriority, ANY
revenue): rowcount = 3002.8599999999997, cumulative cost = {4042674.6 rows,
3.08504476E7 cpu, 0.0 io, 0.0 network, 1.2986673920000002E7 memory}, id = 3416
00-08 HashAgg(group=[{0, 1, 2}], revenue=[SUM($3)]) :
rowType = RecordType(ANY l_orderkey, ANY o_orderdate, ANY o_shippriority, ANY
revenue): rowcount = 30028.6, cumulative cost = {4012646.0 rows, 2.9769418E7
cpu, 0.0 io, 0.0 network, 1.1929667200000001E7 memory}, id = 3415
00-09 Project(l_orderkey=[$6], o_orderdate=[$4],
o_shippriority=[$5], $f3=[*($8, -(1, $9))]) : rowType = RecordType(ANY
l_orderkey, ANY o_orderdate, ANY o_shippriority, ANY $f3): rowcount = 300286.0,
cumulative cost = {3712360.0 rows, 1.8959122E7 cpu, 0.0 io, 0.0 network,
1359600.0 memory}, id = 3414
00-10 Project(c_mktsegment=[$8], c_custkey=[$9],
o_custkey=[$4], o_orderkey=[$5], o_orderdate=[$6], o_shippriority=[$7],
l_orderkey=[$0], l_shipdate=[$1], l_extendedprice=[$2], l_discount=[$3]) :
rowType = RecordType(ANY c_mktsegment, ANY c_custkey, ANY o_custkey, ANY
o_orderkey, ANY o_orderdate, ANY o_shippriority, ANY l_orderkey, ANY
l_shipdate, ANY l_extendedprice, ANY l_discount): rowcount = 300286.0,
cumulative cost = {3412074.0 rows, 1.685712E7 cpu, 0.0 io, 0.0 network,
1359600.0 memory}, id = 3413
00-11 HashJoin(condition=[=($5, $0)],
joinType=[inner]) : rowType = RecordType(ANY l_orderkey, ANY l_shipdate, ANY
l_extendedprice, ANY l_discount, ANY o_custkey, ANY o_orderkey, ANY
o_orderdate, ANY o_shippriority, ANY c_mktsegment, ANY c_custkey): rowcount =
300286.0, cumulative cost = {3111788.0 rows, 1.385426E7 cpu, 0.0 io, 0.0
network, 1359600.0 memory}, id = 3412
00-13 SelectionVectorRemover : rowType =
RecordType(ANY l_orderkey, ANY l_shipdate, ANY l_extendedprice, ANY
l_discount): rowcount = 300286.0, cumulative cost = {2102002.0 rows, 6906578.0
cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3403
00-16 Filter(condition=[>($1, 1995-03-25)]) :
rowType = RecordType(ANY l_orderkey, ANY l_shipdate, ANY l_extendedprice, ANY
l_discount): rowcount = 300286.0, cumulative cost = {1801716.0 rows, 6606292.0
cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3402
00-19 RuntimeFilter : rowType =
RecordType(ANY l_orderkey, ANY l_shipdate, ANY l_extendedprice, ANY
l_discount): rowcount = 600572.0, cumulative cost = {1201144.0 rows, 3002860.0
cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3401
00-22 Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath
[path=hdfs://10.210.214.42:8020/tpch-parquet/lineitem]],
selectionRoot=hdfs://10.210.214.42:8020/tpch-parquet/lineitem, numFiles=1,
numRowGroups=1, usedMetadataFile=false, columns=[`l_orderkey`, `l_shipdate`,
`l_extendedprice`, `l_discount`]]]) : rowType = RecordType(ANY l_orderkey, ANY
l_shipdate, ANY l_extendedprice, ANY l_discount): rowcount = 600572.0,
cumulative cost = {600572.0 rows, 2402288.0 cpu, 0.0 io, 0.0 network, 0.0
memory}, id = 3400
00-12 HashJoin(condition=[=($5, $0)],
joinType=[inner]) : rowType = RecordType(ANY o_custkey, ANY o_orderkey, ANY
o_orderdate, ANY o_shippriority, ANY c_mktsegment, ANY c_custkey): rowcount =
75000.0, cumulative cost = {634500.0 rows, 2744250.0 cpu, 0.0 io, 0.0 network,
39600.0 memory}, id = 3411
00-15 SelectionVectorRemover : rowType =
RecordType(ANY o_custkey, ANY o_orderkey, ANY o_orderdate, ANY o_shippriority):
rowcount = 75000.0, cumulative cost = {525000.0 rows, 1725000.0 cpu, 0.0 io,
0.0 network, 0.0 memory}, id = 3407
00-18 Filter(condition=[<($2, 1995-03-25)]) :
rowType = RecordType(ANY o_custkey, ANY o_orderkey, ANY o_orderdate, ANY
o_shippriority): rowcount = 75000.0, cumulative cost = {450000.0 rows,
1650000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3406
00-21 RuntimeFilter : rowType =
RecordType(ANY o_custkey, ANY o_orderkey, ANY o_orderdate, ANY o_shippriority):
rowcount = 150000.0, cumulative cost = {300000.0 rows, 750000.0 cpu, 0.0 io,
0.0 network, 0.0 memory}, id = 3405
00-23 Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath
[path=hdfs://10.210.214.42:8020/tpch-parquet/orders]],
selectionRoot=hdfs://10.210.214.42:8020/tpch-parquet/orders, numFiles=1,
numRowGroups=1, usedMetadataFile=false, columns=[`o_custkey`, `o_orderkey`,
`o_orderdate`, `o_shippriority`]]]) : rowType = RecordType(ANY o_custkey, ANY
o_orderkey, ANY o_orderdate, ANY o_shippriority): rowcount = 150000.0,
cumulative cost = {150000.0 rows, 600000.0 cpu, 0.0 io, 0.0 network, 0.0
memory}, id = 3404
00-14 SelectionVectorRemover : rowType =
RecordType(ANY c_mktsegment, ANY c_custkey): rowcount = 2250.0, cumulative cost
= {32250.0 rows, 101250.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3410
00-17 Filter(condition=[=($0, 'HOUSEHOLD')])
: rowType = RecordType(ANY c_mktsegment, ANY c_custkey): rowcount = 2250.0,
cumulative cost = {30000.0 rows, 99000.0 cpu, 0.0 io, 0.0 network, 0.0 memory},
id = 3409
00-20 Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath
[path=hdfs://10.210.214.42:8020/tpch-parquet/customer]],
selectionRoot=hdfs://10.210.214.42:8020/tpch-parquet/customer, numFiles=1,
numRowGroups=1, usedMetadataFile=false, columns=[`c_mktsegment`,
`c_custkey`]]]) : rowType = RecordType(ANY c_mktsegment, ANY c_custkey):
rowcount = 15000.0, cumulative cost = {15000.0 rows, 30000.0 cpu, 0.0 io, 0.0
network, 0.0 memory}, id = 3408
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services