[
https://issues.apache.org/jira/browse/DRILL-1162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16093266#comment-16093266
]
Volodymyr Vysotskyi commented on DRILL-1162:
--------------------------------------------
I have reproduced this issue using data from \[1\]. Drill was run in embedded
mode with direct memory size 8G and with set
{{`planner.width.max_per_node`=1}}. Except for the query from the description,
I got {{out of direct memory}} with smaller joins count:
{code:sql}
select count(*) from `lineitem1.parquet` a
inner join `part.parquet` j on a.l_partkey = j.p_partkey
inner join `orders.parquet` k on a.l_orderkey = k.o_orderkey
inner join `supplier.parquet` l on a.l_suppkey = l.s_suppkey
inner join `partsupp.parquet` m on j.p_partkey = m.ps_partkey and l.s_suppkey =
m.ps_suppkey
inner join `customer.parquet` n on k.o_custkey = n.c_custkey
inner join `lineitem2.parquet` b on a.l_orderkey = b.l_orderkey
inner join `lineitem2.parquet` c on a.l_partkey = c.l_partkey
inner join `lineitem2.parquet` e on a.l_extendedprice = e.l_extendedprice
inner join `lineitem2.parquet` f on a.l_comment = f.l_comment
inner join `lineitem2.parquet` g on a.l_shipdate = g.l_shipdate
inner join `lineitem2.parquet` h on a.l_commitdate = h.l_commitdate;
{code}
Logs with error:
{noformat}
2017-07-19 15:28:30,930 [269085ac-548b-3a86-15e6-15f2ef338acf:frag:0:0] INFO
o.a.d.e.w.fragment.FragmentExecutor -
User Error Occurred: One or more nodes ran out of memory while executing the
query. (Failure allocating buffer.)
org.apache.drill.common.exceptions.UserException: RESOURCE ERROR: One or more
nodes ran out of memory while executi
ng the query.
Failure allocating buffer.
[Error Id: 06291256-412f-40a8-87f7-4e422953c591 ]
at
org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:550)
~[drill-common-1.
11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:244)
[drill-java-exec-1.1
1.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
[drill-common-1.11.0-SNAP
SHOT.jar:1.11.0-SNAPSHOT]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[na:1.7.0_80]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[na:1.7.0_80]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_80]
Caused by: org.apache.drill.exec.exception.OutOfMemoryException: Failure
allocating buffer.
at
io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:64)
~[drill-memory-base-1.
11.0-SNAPSHOT.jar:4.0.27.Final]
at
org.apache.drill.exec.memory.AllocationManager.<init>(AllocationManager.java:80)
~[drill-memory-base-1.1
1.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:260)
~[drill-memo
ry-base-1.11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.memory.BaseAllocator.buffer(BaseAllocator.java:243)
~[drill-memory-base-1.11.0-SNA
PSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.memory.BaseAllocator.buffer(BaseAllocator.java:213)
~[drill-memory-base-1.11.0-SNA
PSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.vector.DateVector.allocateBytes(DateVector.java:231)
~[vector-1.11.0-SNAPSHOT.jar:
1.11.0-SNAPSHOT]
at
org.apache.drill.exec.vector.DateVector.allocateNewSafe(DateVector.java:193)
~[vector-1.11.0-SNAPSHOT.ja
r:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.vector.DateVector.allocateNew(DateVector.java:176)
~[vector-1.11.0-SNAPSHOT.jar:1.
11.0-SNAPSHOT]
at
org.apache.drill.exec.physical.impl.join.HashJoinBatch.allocateVectors(HashJoinBatch.java:502)
~[drill-j
ava-exec-1.11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.physical.impl.join.HashJoinBatch.innerNext(HashJoinBatch.java:235)
~[drill-java-ex
ec-1.11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:162)
~[drill-java-exec-1.
11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:119)
~[drill-java-exec-1.
11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:109)
~[drill-java-exec-1.
11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.record.AbstractSingleRecordBatch.innerNext(AbstractSingleRecordBatch.java:51)
~[dr
ill-java-exec-1.11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.physical.impl.project.ProjectRecordBatch.innerNext(ProjectRecordBatch.java:133)
~[
drill-java-exec-1.11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:162)
~[drill-java-exec-1.
11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:119)
~[drill-java-exec-1.
11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.physical.impl.join.HashJoinBatch.executeBuildPhase(HashJoinBatch.java:411)
~[drill
-java-exec-1.11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.physical.impl.join.HashJoinBatch.innerNext(HashJoinBatch.java:222)
~[drill-java-ex
ec-1.11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
at
org.apache.drill.exec.record.AbstractRecordBatch.next(AbstractRecordBatch.java:162)
~[drill-java-exec-1.
11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
{noformat}
Stack trace contains line
{noformat}
at
org.apache.drill.exec.physical.impl.join.HashJoinBatch.executeBuildPhase(HashJoinBatch.java:411)
~[drill-java-exec-1.11.0-SNAPSHOT.jar:1.11.0-SNAPSHOT]
{noformat}
so I agree that error appears when Drill tries to build a hash table for the
join.
Analysing the plan for this query:
{noformat}
00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0,
cumulative cost = {1617750.1 rows, 1.36250751E7 cpu, 0.0 io, 0.0 network,
6893040.0 memory}, id = 50635
00-01 Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount
= 1.0, cumulative cost = {1617750.0 rows, 1.3625075E7 cpu, 0.0 io, 0.0 network,
6893040.0 memory}, id = 50634
00-02 StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType =
RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {1617750.0 rows,
1.3625075E7 cpu, 0.0 io, 0.0 network, 6893040.0 memory}, id = 50633
00-03 Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount =
60175.0, cumulative cost = {1557575.0 rows, 1.2902975E7 cpu, 0.0 io, 0.0
network, 6893040.0 memory}, id = 50632
00-04 HashJoin(condition=[=($0, $1)], joinType=[inner]) : rowType =
RecordType(ANY l_commitdate, ANY l_commitdate0): rowcount = 60175.0, cumulative
cost = {1497400.0 rows, 1.2662275E7 cpu, 0.0 io, 0.0 network, 6893040.0
memory}, id = 50631
00-05 Project(l_commitdate0=[$0]) : rowType = RecordType(ANY
l_commitdate0): rowcount = 60175.0, cumulative cost = {60175.0 rows, 60175.0
cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 50630
00-07 Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=file:/tmp/lineitem2.parquet]],
selectionRoot=file:/tmp/lineitem2.parquet, numFiles=1, usedMetadataFile=false,
columns=[`l_commitdate`]]]) : rowType = RecordType(ANY l_commitdate): rowcount
= 60175.0, cumulative cost = {60175.0 rows, 60175.0 cpu, 0.0 io, 0.0 network,
0.0 memory}, id = 50629
00-06 Project(l_commitdate=[$1]) : rowType = RecordType(ANY
l_commitdate): rowcount = 60175.0, cumulative cost = {1316875.0 rows, 1.13986E7
cpu, 0.0 io, 0.0 network, 5833960.0 memory}, id = 50628
00-08 HashJoin(condition=[=($0, $2)], joinType=[inner]) :
rowType = RecordType(ANY l_shipdate, ANY l_commitdate, ANY l_shipdate0):
rowcount = 60175.0, cumulative cost = {1316875.0 rows, 1.13986E7 cpu, 0.0 io,
0.0 network, 5833960.0 memory}, id = 50627
00-09 Project(l_shipdate0=[$0]) : rowType = RecordType(ANY
l_shipdate0): rowcount = 60175.0, cumulative cost = {60175.0 rows, 60175.0 cpu,
0.0 io, 0.0 network, 0.0 memory}, id = 50626
00-11 Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=file:/tmp/lineitem2.parquet]],
selectionRoot=file:/tmp/lineitem2.parquet, numFiles=1, usedMetadataFile=false,
columns=[`l_shipdate`]]]) : rowType = RecordType(ANY l_shipdate): rowcount =
60175.0, cumulative cost = {60175.0 rows, 60175.0 cpu, 0.0 io, 0.0 network, 0.0
memory}, id = 50625
00-10 Project(l_shipdate=[$1], l_commitdate=[$2]) : rowType =
RecordType(ANY l_shipdate, ANY l_commitdate): rowcount = 60175.0, cumulative
cost = {1136350.0 rows, 1.0134925E7 cpu, 0.0 io, 0.0 network, 4774880.0
memory}, id = 50624
00-12 HashJoin(condition=[=($0, $3)], joinType=[inner]) :
rowType = RecordType(ANY l_comment, ANY l_shipdate, ANY l_commitdate, ANY
l_comment0): rowcount = 60175.0, cumulative cost = {1136350.0 rows, 1.0134925E7
cpu, 0.0 io, 0.0 network, 4774880.0 memory}, id = 50623
00-13 Project(l_comment0=[$0]) : rowType = RecordType(ANY
l_comment0): rowcount = 60175.0, cumulative cost = {60175.0 rows, 60175.0 cpu,
0.0 io, 0.0 network, 0.0 memory}, id = 50622
00-15 Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=file:/tmp/lineitem2.parquet]],
selectionRoot=file:/tmp/lineitem2.parquet, numFiles=1, usedMetadataFile=false,
columns=[`l_comment`]]]) : rowType = RecordType(ANY l_comment): rowcount =
60175.0, cumulative cost = {60175.0 rows, 60175.0 cpu, 0.0 io, 0.0 network, 0.0
memory}, id = 50621
00-14 Project(l_comment=[$1], l_shipdate=[$2],
l_commitdate=[$3]) : rowType = RecordType(ANY l_comment, ANY l_shipdate, ANY
l_commitdate): rowcount = 60175.0, cumulative cost = {955825.0 rows, 8871250.0
cpu, 0.0 io, 0.0 network, 3715800.0 memory}, id = 50620
00-16 HashJoin(condition=[=($0, $4)], joinType=[inner])
: rowType = RecordType(ANY l_extendedprice, ANY l_comment, ANY l_shipdate, ANY
l_commitdate, ANY l_extendedprice0): rowcount = 60175.0, cumulative cost =
{955825.0 rows, 8871250.0 cpu, 0.0 io, 0.0 network, 3715800.0 memory}, id =
50619
00-17 Project(l_extendedprice0=[$0]) : rowType =
RecordType(ANY l_extendedprice0): rowcount = 60175.0, cumulative cost =
{60175.0 rows, 60175.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 50618
00-19 Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=file:/tmp/lineitem2.parquet]],
selectionRoot=file:/tmp/lineitem2.parquet, numFiles=1, usedMetadataFile=false,
columns=[`l_extendedprice`]]]) : rowType = RecordType(ANY l_extendedprice):
rowcount = 60175.0, cumulative cost = {60175.0 rows, 60175.0 cpu, 0.0 io, 0.0
network, 0.0 memory}, id = 50617
00-18 Project(l_extendedprice=[$1], l_comment=[$2],
l_shipdate=[$3], l_commitdate=[$4]) : rowType = RecordType(ANY l_extendedprice,
ANY l_comment, ANY l_shipdate, ANY l_commitdate): rowcount = 60175.0,
cumulative cost = {775300.0 rows, 7607575.0 cpu, 0.0 io, 0.0 network, 2656720.0
memory}, id = 50616
00-20 HashJoin(condition=[=($0, $5)],
joinType=[inner]) : rowType = RecordType(ANY l_partkey, ANY l_extendedprice,
ANY l_comment, ANY l_shipdate, ANY l_commitdate, ANY l_partkey0): rowcount =
60175.0, cumulative cost = {775300.0 rows, 7607575.0 cpu, 0.0 io, 0.0 network,
2656720.0 memory}, id = 50615
00-21 Project(l_partkey0=[$0]) : rowType =
RecordType(ANY l_partkey0): rowcount = 60175.0, cumulative cost = {60175.0
rows, 60175.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 50614
00-23 Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=file:/tmp/lineitem2.parquet]],
selectionRoot=file:/tmp/lineitem2.parquet, numFiles=1, usedMetadataFile=false,
columns=[`l_partkey`]]]) : rowType = RecordType(ANY l_partkey): rowcount =
60175.0, cumulative cost = {60175.0 rows, 60175.0 cpu, 0.0 io, 0.0 network, 0.0
memory}, id = 50613
00-22 Project(l_partkey=[$0],
l_extendedprice=[$2], l_comment=[$3], l_shipdate=[$4], l_commitdate=[$5]) :
rowType = RecordType(ANY l_partkey, ANY l_extendedprice, ANY l_comment, ANY
l_shipdate, ANY l_commitdate): rowcount = 60175.0, cumulative cost = {594775.0
rows, 6343900.0 cpu, 0.0 io, 0.0 network, 1597640.0 memory}, id = 50612
00-24 HashJoin(condition=[=($1, $6)],
joinType=[inner]) : rowType = RecordType(ANY l_partkey, ANY l_orderkey, ANY
l_extendedprice, ANY l_comment, ANY l_shipdate, ANY l_commitdate, ANY
l_orderkey0): rowcount = 60175.0, cumulative cost = {594775.0 rows, 6343900.0
cpu, 0.0 io, 0.0 network, 1597640.0 memory}, id = 50611
00-25 Project(l_orderkey0=[$0]) : rowType =
RecordType(ANY l_orderkey0): rowcount = 60175.0, cumulative cost = {60175.0
rows, 60175.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 50610
00-27 Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=file:/tmp/lineitem2.parquet]],
selectionRoot=file:/tmp/lineitem2.parquet, numFiles=1, usedMetadataFile=false,
columns=[`l_orderkey`]]]) : rowType = RecordType(ANY l_orderkey): rowcount =
60175.0, cumulative cost = {60175.0 rows, 60175.0 cpu, 0.0 io, 0.0 network, 0.0
memory}, id = 50609
00-26 Project(l_partkey=[$0], l_orderkey=[$1],
l_extendedprice=[$2], l_comment=[$3], l_shipdate=[$4], l_commitdate=[$5]) :
rowType = RecordType(ANY l_partkey, ANY l_orderkey, ANY l_extendedprice, ANY
l_comment, ANY l_shipdate, ANY l_commitdate): rowcount = 60175.0, cumulative
cost = {414250.0 rows, 5080225.0 cpu, 0.0 io, 0.0 network, 538560.0 memory}, id
= 50608
00-28 HashJoin(condition=[=($6, $7)],
joinType=[inner]) : rowType = RecordType(ANY l_partkey, ANY l_orderkey, ANY
l_extendedprice, ANY l_comment, ANY l_shipdate, ANY l_commitdate, ANY
o_custkey, ANY c_custkey): rowcount = 60175.0, cumulative cost = {414250.0
rows, 5080225.0 cpu, 0.0 io, 0.0 network, 538560.0 memory}, id = 50607
00-30 Project(l_partkey=[$0],
l_orderkey=[$1], l_extendedprice=[$2], l_comment=[$3], l_shipdate=[$4],
l_commitdate=[$5], o_custkey=[$7]) : rowType = RecordType(ANY l_partkey, ANY
l_orderkey, ANY l_extendedprice, ANY l_comment, ANY l_shipdate, ANY
l_commitdate, ANY o_custkey): rowcount = 60175.0, cumulative cost = {351075.0
rows, 4344625.0 cpu, 0.0 io, 0.0 network, 512160.0 memory}, id = 50605
00-31 HashJoin(condition=[AND(=($6, $9),
=($8, $10))], joinType=[inner]) : rowType = RecordType(ANY l_partkey, ANY
l_orderkey, ANY l_extendedprice, ANY l_comment, ANY l_shipdate, ANY
l_commitdate, ANY p_partkey, ANY o_custkey, ANY s_suppkey, ANY ps_partkey, ANY
ps_suppkey): rowcount = 60175.0, cumulative cost = {351075.0 rows, 4344625.0
cpu, 0.0 io, 0.0 network, 512160.0 memory}, id = 50604
00-33 Project(l_partkey=[$0],
l_orderkey=[$1], l_extendedprice=[$3], l_comment=[$4], l_shipdate=[$5],
l_commitdate=[$6], p_partkey=[$7], o_custkey=[$8], s_suppkey=[$9]) : rowType =
RecordType(ANY l_partkey, ANY l_orderkey, ANY l_extendedprice, ANY l_comment,
ANY l_shipdate, ANY l_commitdate, ANY p_partkey, ANY o_custkey, ANY s_suppkey):
rowcount = 60175.0, cumulative cost = {274900.0 rows, 2756425.0 cpu, 0.0 io,
0.0 network, 300960.0 memory}, id = 50602
00-34 HashJoin(condition=[=($2,
$9)], joinType=[inner]) : rowType = RecordType(ANY l_partkey, ANY l_orderkey,
ANY l_suppkey, ANY l_extendedprice, ANY l_comment, ANY l_shipdate, ANY
l_commitdate, ANY p_partkey, ANY o_custkey, ANY s_suppkey): rowcount = 60175.0,
cumulative cost = {274900.0 rows, 2756425.0 cpu, 0.0 io, 0.0 network, 300960.0
memory}, id = 50601
00-36 Project(l_partkey=[$0],
l_orderkey=[$1], l_suppkey=[$2], l_extendedprice=[$3], l_comment=[$4],
l_shipdate=[$5], l_commitdate=[$6], p_partkey=[$7], o_custkey=[$9]) : rowType =
RecordType(ANY l_partkey, ANY l_orderkey, ANY l_suppkey, ANY l_extendedprice,
ANY l_comment, ANY l_shipdate, ANY l_commitdate, ANY p_partkey, ANY o_custkey):
rowcount = 60175.0, cumulative cost = {214525.0 rows, 2033425.0 cpu, 0.0 io,
0.0 network, 299200.0 memory}, id = 50599
00-37 HashJoin(condition=[=($1,
$8)], joinType=[inner]) : rowType = RecordType(ANY l_partkey, ANY l_orderkey,
ANY l_suppkey, ANY l_extendedprice, ANY l_comment, ANY l_shipdate, ANY
l_commitdate, ANY p_partkey, ANY o_orderkey, ANY o_custkey): rowcount =
60175.0, cumulative cost = {214525.0 rows, 2033425.0 cpu, 0.0 io, 0.0 network,
299200.0 memory}, id = 50598
00-39
HashJoin(condition=[=($0, $7)], joinType=[inner]) : rowType = RecordType(ANY
l_partkey, ANY l_orderkey, ANY l_suppkey, ANY l_extendedprice, ANY l_comment,
ANY l_shipdate, ANY l_commitdate, ANY p_partkey): rowcount = 60175.0,
cumulative cost = {124350.0 rows, 1161325.0 cpu, 0.0 io, 0.0 network, 35200.0
memory}, id = 50596
00-41
Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=file:/tmp/lineitem1.parquet]], selectionRoot=file:/tmp/lineitem1.parquet,
numFiles=1, usedMetadataFile=false, columns=[`l_partkey`, `l_orderkey`,
`l_suppkey`, `l_extendedprice`, `l_comment`, `l_shipdate`, `l_commitdate`]]]) :
rowType = RecordType(ANY l_partkey, ANY l_orderkey, ANY l_suppkey, ANY
l_extendedprice, ANY l_comment, ANY l_shipdate, ANY l_commitdate): rowcount =
60175.0, cumulative cost = {60175.0 rows, 421225.0 cpu, 0.0 io, 0.0 network,
0.0 memory}, id = 50594
00-40
Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=file:/tmp/part.parquet]], selectionRoot=file:/tmp/part.parquet,
numFiles=1, usedMetadataFile=false, columns=[`p_partkey`]]]) : rowType =
RecordType(ANY p_partkey): rowcount = 2000.0, cumulative cost = {2000.0 rows,
2000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 50595
00-38
Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=file:/tmp/orders.parquet]], selectionRoot=file:/tmp/orders.parquet,
numFiles=1, usedMetadataFile=false, columns=[`o_orderkey`, `o_custkey`]]]) :
rowType = RecordType(ANY o_orderkey, ANY o_custkey): rowcount = 15000.0,
cumulative cost = {15000.0 rows, 30000.0 cpu, 0.0 io, 0.0 network, 0.0 memory},
id = 50597
00-35
Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=file:/tmp/supplier.parquet]], selectionRoot=file:/tmp/supplier.parquet,
numFiles=1, usedMetadataFile=false, columns=[`s_suppkey`]]]) : rowType =
RecordType(ANY s_suppkey): rowcount = 100.0, cumulative cost = {100.0 rows,
100.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 50600
00-32 Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=file:/tmp/partsupp.parquet]],
selectionRoot=file:/tmp/partsupp.parquet, numFiles=1, usedMetadataFile=false,
columns=[`ps_partkey`, `ps_suppkey`]]]) : rowType = RecordType(ANY ps_partkey,
ANY ps_suppkey): rowcount = 8000.0, cumulative cost = {8000.0 rows, 16000.0
cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 50603
00-29 Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=file:/tmp/customer.parquet]],
selectionRoot=file:/tmp/customer.parquet, numFiles=1, usedMetadataFile=false,
columns=[`c_custkey`]]]) : rowType = RecordType(ANY c_custkey): rowcount =
1500.0, cumulative cost = {1500.0 rows, 1500.0 cpu, 0.0 io, 0.0 network, 0.0
memory}, id = 50606
{noformat}
I found that in many cases right operator of hash join is more complex than the
left one. Rows count of the right operator is the same as the rows count of the
left operator, but it is only an estimate value. Rows number is increased with
each join, since the table is joining itself by the fields that have multiple
identical values. Therefore, the right operator actually returns much more rows
than the left one and rows count is so large that there is not enough direct
memory to build a hash table.
I found that the left and right operators are swapped in the method
[getPhysicalOperator()|https://github.com/apache/drill/blob/3e8b01d5b0d3013e3811913f0fd6028b22c1ac3f/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java#L80].
This swap was added in the DRILL-2236.
With disabled option
{code:sql}
alter session set `planner.enable_hashjoin_swap`=false;
{code}
Drill uses almost 170Mb of direct memory and returns the correct result for the
query that I used to reproduce the bug:
{noformat}
+--------------+
| EXPR$0 |
+--------------+
| 14559802949 |
+--------------+
1 row selected (561.779 seconds)
{noformat}
Also, bug described in DRILL-2236 is not reproduced on the current version when
this option is disabled:
{noformat}
0: jdbc:drill:zk=local> explain plan for select c.c_custkey, c.c_name, n.n_name
. . . . . . . . . . . > from nation n, customer c
. . . . . . . . . . . > where n.n_nationkey = c.c_nationkey;
+------+------+
| text | json |
+------+------+
| 00-00 Screen
00-01 Project(c_custkey=[$0], c_name=[$1], n_name=[$2])
00-02 Project(c_custkey=[$3], c_name=[$4], n_name=[$1])
00-03 Project(n_nationkey=[$3], n_name=[$4], c_nationkey=[$0],
c_custkey=[$1], c_name=[$2])
00-04 HashJoin(condition=[=($3, $0)], joinType=[inner])
00-06 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=file:/tmp/customer]], selectionRoot=file:/tmp/customer, numFiles=1,
usedMetadataFile=false, columns=[`c_nationkey`, `c_custkey`, `c_name`]]])
00-05 Scan(groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath
[path=file:/tmp/nation]], selectionRoot=file:/tmp/nation, numFiles=1,
usedMetadataFile=false, columns=[`n_nationkey`, `n_name`]]])
{noformat}
DRILL-2236 contains comment {{Please note that once we resolve the issue of
SwapJoinRule, we should remove this workaround solution in Drill's code.}}
So since bug from DRILL-2236 is not reproduced and without the swapping hash
join nodes we don't see OOM, I propose to revert the fix for DRILL-2236.
\[1\]
https://github.com/mapr/drill-test-framework/tree/master/framework/resources/Datasources/Tpch0.01/parquet
> 25 way join ended up with OOM
> -----------------------------
>
> Key: DRILL-1162
> URL: https://issues.apache.org/jira/browse/DRILL-1162
> Project: Apache Drill
> Issue Type: Bug
> Components: Execution - Flow, Query Planning & Optimization
> Reporter: Rahul Challapalli
> Assignee: Volodymyr Vysotskyi
> Priority: Critical
> Fix For: Future
>
> Attachments: error.log, oom_error.log
>
>
> git.commit.id.abbrev=e5c2da0
> The below query results in 0 results being returned
> select count(*) from `lineitem1.parquet` a
> inner join `part.parquet` j on a.l_partkey = j.p_partkey
> inner join `orders.parquet` k on a.l_orderkey = k.o_orderkey
> inner join `supplier.parquet` l on a.l_suppkey = l.s_suppkey
> inner join `partsupp.parquet` m on j.p_partkey = m.ps_partkey and l.s_suppkey
> = m.ps_suppkey
> inner join `customer.parquet` n on k.o_custkey = n.c_custkey
> inner join `lineitem2.parquet` b on a.l_orderkey = b.l_orderkey
> inner join `lineitem2.parquet` c on a.l_partkey = c.l_partkey
> inner join `lineitem2.parquet` d on a.l_suppkey = d.l_suppkey
> inner join `lineitem2.parquet` e on a.l_extendedprice = e.l_extendedprice
> inner join `lineitem2.parquet` f on a.l_comment = f.l_comment
> inner join `lineitem2.parquet` g on a.l_shipdate = g.l_shipdate
> inner join `lineitem2.parquet` h on a.l_commitdate = h.l_commitdate
> inner join `lineitem2.parquet` i on a.l_receiptdate = i.l_receiptdate
> inner join `lineitem2.parquet` o on a.l_receiptdate = o.l_receiptdate
> inner join `lineitem2.parquet` p on a.l_receiptdate = p.l_receiptdate
> inner join `lineitem2.parquet` q on a.l_receiptdate = q.l_receiptdate
> inner join `lineitem2.parquet` r on a.l_receiptdate = r.l_receiptdate
> inner join `lineitem2.parquet` s on a.l_receiptdate = s.l_receiptdate
> inner join `lineitem2.parquet` t on a.l_receiptdate = t.l_receiptdate
> inner join `lineitem2.parquet` u on a.l_receiptdate = u.l_receiptdate
> inner join `lineitem2.parquet` v on a.l_receiptdate = v.l_receiptdate
> inner join `lineitem2.parquet` w on a.l_receiptdate = w.l_receiptdate
> inner join `lineitem2.parquet` x on a.l_receiptdate = x.l_receiptdate;
> However when we remove the last 'inner join' and run the query it returns
> '716372534'. Since the last inner join is similar to the one's before it, it
> should match some records and return the data appropriately.
> The logs indicated that it actually returned 0 results. Attached the log file.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)