Repository: spark Updated Branches: refs/heads/master 071eaaf9d -> cf0cce903
[SPARK-17113] [SHUFFLE] Job failure due to Executor OOM in offheap mode ## What changes were proposed in this pull request? This PR fixes executor OOM in offheap mode due to bug in Cooperative Memory Management for UnsafeExternSorter. UnsafeExternalSorter was checking if memory page is being used by upstream by comparing the base object address of the current page with the base object address of upstream. However, in case of offheap memory allocation, the base object addresses are always null, so there was no spilling happening and eventually the operator would OOM. Following is the stack trace this issue addresses - java.lang.OutOfMemoryError: Unable to acquire 1220 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:341) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:362) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:93) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170) ## How was this patch tested? Tested by running the failing job. Author: Sital Kedia <ske...@fb.com> Closes #14693 from sitalkedia/fix_offheap_oom. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cf0cce90 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cf0cce90 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cf0cce90 Branch: refs/heads/master Commit: cf0cce90364d17afe780ff9a5426dfcefa298535 Parents: 071eaaf Author: Sital Kedia <ske...@fb.com> Authored: Fri Aug 19 11:27:30 2016 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Fri Aug 19 11:27:30 2016 -0700 ---------------------------------------------------------------------- .../util/collection/unsafe/sort/UnsafeExternalSorter.java | 2 +- .../util/collection/unsafe/sort/UnsafeInMemorySorter.java | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/cf0cce90/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 8d596f8..ccf7664 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -522,7 +522,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { // is accessing the current record. We free this page in that caller's next loadNext() // call. for (MemoryBlock page : allocatedPages) { - if (!loaded || page.getBaseObject() != upstream.getBaseObject()) { + if (!loaded || page.pageNumber != ((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) { released += page.size(); freePage(page); } else { http://git-wip-us.apache.org/repos/asf/spark/blob/cf0cce90/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 78da389..30d0f30 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -248,6 +248,7 @@ public final class UnsafeInMemorySorter { private long baseOffset; private long keyPrefix; private int recordLength; + private long currentPageNumber; private SortedIterator(int numRecords, int offset) { this.numRecords = numRecords; @@ -262,6 +263,7 @@ public final class UnsafeInMemorySorter { iter.baseOffset = baseOffset; iter.keyPrefix = keyPrefix; iter.recordLength = recordLength; + iter.currentPageNumber = currentPageNumber; return iter; } @@ -279,6 +281,7 @@ public final class UnsafeInMemorySorter { public void loadNext() { // This pointer points to a 4-byte record length, followed by the record's bytes final long recordPointer = array.get(offset + position); + currentPageNumber = memoryManager.decodePageNumber(recordPointer); baseObject = memoryManager.getPage(recordPointer); baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length recordLength = Platform.getInt(baseObject, baseOffset - 4); @@ -292,6 +295,10 @@ public final class UnsafeInMemorySorter { @Override public long getBaseOffset() { return baseOffset; } + public long getCurrentPageNumber() { + return currentPageNumber; + } + @Override public int getRecordLength() { return recordLength; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org