Repository: spark Updated Branches: refs/heads/master ddcd2e826 -> 9f5647d62
[SPARK-21319][SQL] Fix memory leak in sorter ## What changes were proposed in this pull request? `UnsafeExternalSorter.recordComparator` can be either `KVComparator` or `RowComparator`, and both of them will keep the reference to the input rows they compared last time. After sorting, we return the sorted iterator to upstream operators. However, the upstream operators may take a while to consume up the sorted iterator, and `UnsafeExternalSorter` is registered to `TaskContext` at [here](https://github.com/apache/spark/blob/v2.2.0/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L159-L161), which means we will keep the `UnsafeExternalSorter` instance and keep the last compared input rows in memory until the sorted iterator is consumed up. Things get worse if we sort within partitions of a dataset and coalesce all partitions into one, as we will keep a lot of input rows in memory and the time to consume up all the sorted iterators is long. This PR takes over https://github.com/apache/spark/pull/18543 , the idea is that, we do not keep the record comparator instance in `UnsafeExternalSorter`, but a generator of record comparator. close #18543 ## How was this patch tested? N/A Author: Wenchen Fan <[email protected]> Closes #18679 from cloud-fan/memory-leak. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9f5647d6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9f5647d6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9f5647d6 Branch: refs/heads/master Commit: 9f5647d62ee569a4c8bdc242adcb8d4e05c662f9 Parents: ddcd2e8 Author: Wenchen Fan <[email protected]> Authored: Thu Jul 27 22:56:26 2017 +0800 Committer: Wenchen Fan <[email protected]> Committed: Thu Jul 27 22:56:26 2017 +0800 ---------------------------------------------------------------------- .../unsafe/sort/UnsafeExternalSorter.java | 40 ++++++++++++++------ .../unsafe/sort/UnsafeExternalSorterSuite.java | 4 +- .../sql/execution/UnsafeExternalRowSorter.java | 4 +- .../sql/execution/UnsafeKVExternalSorter.java | 16 +++++--- 4 files changed, 42 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9f5647d6/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index a6e858c..e2059ce 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.IOException; import java.util.LinkedList; import java.util.Queue; +import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -48,8 +49,16 @@ public final class UnsafeExternalSorter extends MemoryConsumer { @Nullable private final PrefixComparator prefixComparator; + + /** + * {@link RecordComparator} may probably keep the reference to the records they compared last + * time, so we should not keep a {@link RecordComparator} instance inside + * {@link UnsafeExternalSorter}, because {@link UnsafeExternalSorter} is referenced by + * {@link TaskContext} and thus can not be garbage collected until the end of the task. + */ @Nullable - private final RecordComparator recordComparator; + private final Supplier<RecordComparator> recordComparatorSupplier; + private final TaskMemoryManager taskMemoryManager; private final BlockManager blockManager; private final SerializerManager serializerManager; @@ -90,14 +99,14 @@ public final class UnsafeExternalSorter extends MemoryConsumer { BlockManager blockManager, SerializerManager serializerManager, TaskContext taskContext, - RecordComparator recordComparator, + Supplier<RecordComparator> recordComparatorSupplier, PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, long numElementsForSpillThreshold, UnsafeInMemorySorter inMemorySorter) throws IOException { UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager, - serializerManager, taskContext, recordComparator, prefixComparator, initialSize, + serializerManager, taskContext, recordComparatorSupplier, prefixComparator, initialSize, numElementsForSpillThreshold, pageSizeBytes, inMemorySorter, false /* ignored */); sorter.spill(Long.MAX_VALUE, sorter); // The external sorter will be used to insert records, in-memory sorter is not needed. @@ -110,14 +119,14 @@ public final class UnsafeExternalSorter extends MemoryConsumer { BlockManager blockManager, SerializerManager serializerManager, TaskContext taskContext, - RecordComparator recordComparator, + Supplier<RecordComparator> recordComparatorSupplier, PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, long numElementsForSpillThreshold, boolean canUseRadixSort) { return new UnsafeExternalSorter(taskMemoryManager, blockManager, serializerManager, - taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, + taskContext, recordComparatorSupplier, prefixComparator, initialSize, pageSizeBytes, numElementsForSpillThreshold, null, canUseRadixSort); } @@ -126,7 +135,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { BlockManager blockManager, SerializerManager serializerManager, TaskContext taskContext, - RecordComparator recordComparator, + Supplier<RecordComparator> recordComparatorSupplier, PrefixComparator prefixComparator, int initialSize, long pageSizeBytes, @@ -138,15 +147,24 @@ public final class UnsafeExternalSorter extends MemoryConsumer { this.blockManager = blockManager; this.serializerManager = serializerManager; this.taskContext = taskContext; - this.recordComparator = recordComparator; + this.recordComparatorSupplier = recordComparatorSupplier; this.prefixComparator = prefixComparator; // Use getSizeAsKb (not bytes) to maintain backwards compatibility for units // this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024 this.fileBufferSizeBytes = 32 * 1024; if (existingInMemorySorter == null) { + RecordComparator comparator = null; + if (recordComparatorSupplier != null) { + comparator = recordComparatorSupplier.get(); + } this.inMemSorter = new UnsafeInMemorySorter( - this, taskMemoryManager, recordComparator, prefixComparator, initialSize, canUseRadixSort); + this, + taskMemoryManager, + comparator, + prefixComparator, + initialSize, + canUseRadixSort); } else { this.inMemSorter = existingInMemorySorter; } @@ -451,14 +469,14 @@ public final class UnsafeExternalSorter extends MemoryConsumer { * after consuming this iterator. */ public UnsafeSorterIterator getSortedIterator() throws IOException { - assert(recordComparator != null); + assert(recordComparatorSupplier != null); if (spillWriters.isEmpty()) { assert(inMemSorter != null); readingIterator = new SpillableIterator(inMemSorter.getSortedIterator()); return readingIterator; } else { - final UnsafeSorterSpillMerger spillMerger = - new UnsafeSorterSpillMerger(recordComparator, prefixComparator, spillWriters.size()); + final UnsafeSorterSpillMerger spillMerger = new UnsafeSorterSpillMerger( + recordComparatorSupplier.get(), prefixComparator, spillWriters.size()); for (UnsafeSorterSpillWriter spillWriter : spillWriters) { spillMerger.addSpillIfNotEmpty(spillWriter.getReader(serializerManager)); } http://git-wip-us.apache.org/repos/asf/spark/blob/9f5647d6/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index cd5db1a..5330a68 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -154,7 +154,7 @@ public class UnsafeExternalSorterSuite { blockManager, serializerManager, taskContext, - recordComparator, + () -> recordComparator, prefixComparator, /* initialSize */ 1024, pageSizeBytes, @@ -440,7 +440,7 @@ public class UnsafeExternalSorterSuite { blockManager, serializerManager, taskContext, - recordComparator, + () -> recordComparator, prefixComparator, 1024, pageSizeBytes, http://git-wip-us.apache.org/repos/asf/spark/blob/9f5647d6/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index 53b0886..12a123e 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -84,7 +84,7 @@ public final class UnsafeExternalRowSorter { sparkEnv.blockManager(), sparkEnv.serializerManager(), taskContext, - new RowComparator(ordering, schema.length()), + () -> new RowComparator(ordering, schema.length()), prefixComparator, sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize", DEFAULT_INITIAL_SORT_BUFFER_SIZE), @@ -195,12 +195,10 @@ public final class UnsafeExternalRowSorter { private static final class RowComparator extends RecordComparator { private final Ordering<InternalRow> ordering; - private final int numFields; private final UnsafeRow row1; private final UnsafeRow row2; RowComparator(Ordering<InternalRow> ordering, int numFields) { - this.numFields = numFields; this.row1 = new UnsafeRow(numFields); this.row2 = new UnsafeRow(numFields); this.ordering = ordering; http://git-wip-us.apache.org/repos/asf/spark/blob/9f5647d6/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java ---------------------------------------------------------------------- diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index d8acf11..6aa52f1 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution; import javax.annotation.Nullable; import java.io.IOException; +import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; @@ -76,7 +77,8 @@ public final class UnsafeKVExternalSorter { prefixComputer = SortPrefixUtils.createPrefixGenerator(keySchema); PrefixComparator prefixComparator = SortPrefixUtils.getPrefixComparator(keySchema); BaseOrdering ordering = GenerateOrdering.create(keySchema); - KVComparator recordComparator = new KVComparator(ordering, keySchema.length()); + Supplier<RecordComparator> comparatorSupplier = + () -> new KVComparator(ordering, keySchema.length()); boolean canUseRadixSort = keySchema.length() == 1 && SortPrefixUtils.canSortFullyWithPrefix(keySchema.apply(0)); @@ -88,7 +90,7 @@ public final class UnsafeKVExternalSorter { blockManager, serializerManager, taskContext, - recordComparator, + comparatorSupplier, prefixComparator, SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize", UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE), @@ -104,7 +106,11 @@ public final class UnsafeKVExternalSorter { // as the underlying array for in-memory sorter (it's always large enough). // Since we will not grow the array, it's fine to pass `null` as consumer. final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter( - null, taskMemoryManager, recordComparator, prefixComparator, map.getArray(), + null, + taskMemoryManager, + comparatorSupplier.get(), + prefixComparator, + map.getArray(), canUseRadixSort); // We cannot use the destructive iterator here because we are reusing the existing memory @@ -137,7 +143,7 @@ public final class UnsafeKVExternalSorter { blockManager, serializerManager, taskContext, - new KVComparator(ordering, keySchema.length()), + comparatorSupplier, prefixComparator, SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize", UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE), @@ -227,10 +233,8 @@ public final class UnsafeKVExternalSorter { private final BaseOrdering ordering; private final UnsafeRow row1; private final UnsafeRow row2; - private final int numKeyFields; KVComparator(BaseOrdering ordering, int numKeyFields) { - this.numKeyFields = numKeyFields; this.row1 = new UnsafeRow(numKeyFields); this.row2 = new UnsafeRow(numKeyFields); this.ordering = ordering; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
