[
https://issues.apache.org/jira/browse/SPARK-21319?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
James Baker updated SPARK-21319:
--------------------------------
Description:
When we wish to sort within partitions, we produce an UnsafeExternalRowSorter.
This contains an UnsafeExternalSorter, which contains the
UnsafeExternalRowComparator.
The UnsafeExternalSorter adds a task completion listener which performs any
additional required cleanup. The upshot of this is that we maintain a reference
to the UnsafeExternalRowSorter.RowComparator until the end of the task.
The RowComparator looks like
{code:java}
private static final class RowComparator extends RecordComparator {
private final Ordering<InternalRow> ordering;
private final int numFields;
private final UnsafeRow row1;
private final UnsafeRow row2;
RowComparator(Ordering<InternalRow> ordering, int numFields) {
this.numFields = numFields;
this.row1 = new UnsafeRow(numFields);
this.row2 = new UnsafeRow(numFields);
this.ordering = ordering;
}
@Override
public int compare(Object baseObj1, long baseOff1, Object baseObj2, long
baseOff2) {
// TODO: Why are the sizes -1?
row1.pointTo(baseObj1, baseOff1, -1);
row2.pointTo(baseObj2, baseOff2, -1);
return ordering.compare(row1, row2);
}
}
{code}
which means that this will contain references to the last baseObjs that were
passed in, and without tracking them for purposes of memory allocation.
We have a job which sorts within partitions and then coalesces partitions -
this has a tendency to OOM because of the references to old UnsafeRows that
were used during the sorting.
This is a screenshot of a memory dump during a task - our JVM has two executor
threads.
!hprof.png|thumbnail!
It can be seen that we have 2 references inside of row iterators, and 11 more
which are only known in the task completion listener or as part of memory
management.
was:
When we wish to sort within partitions, we produce an UnsafeExternalRowSorter.
This contains an UnsafeExternalSorter, which contains the
UnsafeExternalRowComparator.
The UnsafeExternalSorter adds a task completion listener which performs any
additional required cleanup. The upshot of this is that we maintain a reference
to the UnsafeExternalRowSorter.RowComparator until the end of the task.
The RowComparator looks like
{code:java}
private static final class RowComparator extends RecordComparator {
private final Ordering<InternalRow> ordering;
private final int numFields;
private final UnsafeRow row1;
private final UnsafeRow row2;
RowComparator(Ordering<InternalRow> ordering, int numFields) {
this.numFields = numFields;
this.row1 = new UnsafeRow(numFields);
this.row2 = new UnsafeRow(numFields);
this.ordering = ordering;
}
@Override
public int compare(Object baseObj1, long baseOff1, Object baseObj2, long
baseOff2) {
// TODO: Why are the sizes -1?
row1.pointTo(baseObj1, baseOff1, -1);
row2.pointTo(baseObj2, baseOff2, -1);
return ordering.compare(row1, row2);
}
}
{code}
which means that this will contain references to the last baseObjs that were
passed in, and without tracking them for purposes of memory allocation.
We have a job which sorts within partitions and then coalesces partitions -
this has a tendency to OOM because of the references to old UnsafeRows that
were used during the sorting.
> UnsafeExternalRowSorter.RowComparator memory leak
> -------------------------------------------------
>
> Key: SPARK-21319
> URL: https://issues.apache.org/jira/browse/SPARK-21319
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0
> Reporter: James Baker
> Attachments: hprof.png
>
>
> When we wish to sort within partitions, we produce an
> UnsafeExternalRowSorter. This contains an UnsafeExternalSorter, which
> contains the UnsafeExternalRowComparator.
> The UnsafeExternalSorter adds a task completion listener which performs any
> additional required cleanup. The upshot of this is that we maintain a
> reference to the UnsafeExternalRowSorter.RowComparator until the end of the
> task.
> The RowComparator looks like
> {code:java}
> private static final class RowComparator extends RecordComparator {
> private final Ordering<InternalRow> ordering;
> private final int numFields;
> private final UnsafeRow row1;
> private final UnsafeRow row2;
> RowComparator(Ordering<InternalRow> ordering, int numFields) {
> this.numFields = numFields;
> this.row1 = new UnsafeRow(numFields);
> this.row2 = new UnsafeRow(numFields);
> this.ordering = ordering;
> }
> @Override
> public int compare(Object baseObj1, long baseOff1, Object baseObj2, long
> baseOff2) {
> // TODO: Why are the sizes -1?
> row1.pointTo(baseObj1, baseOff1, -1);
> row2.pointTo(baseObj2, baseOff2, -1);
> return ordering.compare(row1, row2);
> }
> }
> {code}
> which means that this will contain references to the last baseObjs that were
> passed in, and without tracking them for purposes of memory allocation.
> We have a job which sorts within partitions and then coalesces partitions -
> this has a tendency to OOM because of the references to old UnsafeRows that
> were used during the sorting.
> This is a screenshot of a memory dump during a task - our JVM has two
> executor threads.
> !hprof.png|thumbnail!
> It can be seen that we have 2 references inside of row iterators, and 11 more
> which are only known in the task completion listener or as part of memory
> management.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]