Repository: spark
Updated Branches:
  refs/heads/master a11db942a -> 7c92351f4


[MINOR][CORE] Cleanup dead code and duplication in Mem. Management

## What changes were proposed in this pull request?

* Removed the method 
`org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter#alignToWords`.
It became unused as a result of 85b0a157543201895557d66306b38b3ca52f2151
(SPARK-15962) introducing word alignment for unsafe arrays.
* Cleaned up duplicate code in memory management and unsafe sorters
  * The change extracting the exception paths is more than just cosmetics since 
it def. reduces the size the affected methods compile to

## How was this patch tested?

* Build still passes after removing the method, grepping the codebase for 
`alignToWords` shows no reference to it anywhere either.
* Dried up code is covered by existing tests.

Author: Armin <m...@obrown.io>

Closes #19254 from original-brownbear/cleanup-mem-consumer.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c92351f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c92351f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c92351f

Branch: refs/heads/master
Commit: 7c92351f43ac4b1710e3c80c78f7978dad491ed2
Parents: a11db94
Author: Armin <m...@obrown.io>
Authored: Tue Sep 19 10:06:32 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Sep 19 10:06:32 2017 +0100

----------------------------------------------------------------------
 .../org/apache/spark/memory/MemoryConsumer.java | 26 ++++++++--------
 .../spark/unsafe/map/BytesToBytesMap.java       | 24 ++++++---------
 .../unsafe/sort/UnsafeExternalSorter.java       | 32 +++++++++-----------
 .../expressions/codegen/UnsafeRowWriter.java    | 16 ----------
 4 files changed, 37 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7c92351f/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java 
b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 4099fb0..0efae16 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -89,13 +89,7 @@ public abstract class MemoryConsumer {
     long required = size * 8L;
     MemoryBlock page = taskMemoryManager.allocatePage(required, this);
     if (page == null || page.size() < required) {
-      long got = 0;
-      if (page != null) {
-        got = page.size();
-        taskMemoryManager.freePage(page, this);
-      }
-      taskMemoryManager.showMemoryUsage();
-      throw new OutOfMemoryError("Unable to acquire " + required + " bytes of 
memory, got " + got);
+      throwOom(page, required);
     }
     used += required;
     return new LongArray(page);
@@ -116,13 +110,7 @@ public abstract class MemoryConsumer {
   protected MemoryBlock allocatePage(long required) {
     MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, 
required), this);
     if (page == null || page.size() < required) {
-      long got = 0;
-      if (page != null) {
-        got = page.size();
-        taskMemoryManager.freePage(page, this);
-      }
-      taskMemoryManager.showMemoryUsage();
-      throw new OutOfMemoryError("Unable to acquire " + required + " bytes of 
memory, got " + got);
+      throwOom(page, required);
     }
     used += page.size();
     return page;
@@ -152,4 +140,14 @@ public abstract class MemoryConsumer {
     taskMemoryManager.releaseExecutionMemory(size, this);
     used -= size;
   }
+
+  private void throwOom(final MemoryBlock page, final long required) {
+    long got = 0;
+    if (page != null) {
+      got = page.size();
+      taskMemoryManager.freePage(page, this);
+    }
+    taskMemoryManager.showMemoryUsage();
+    throw new OutOfMemoryError("Unable to acquire " + required + " bytes of 
memory, got " + got);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7c92351f/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java 
b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 610ace3..4fadfe3 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -283,13 +283,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
         } else {
           currentPage = null;
           if (reader != null) {
-            // remove the spill file from disk
-            File file = spillWriters.removeFirst().getFile();
-            if (file != null && file.exists()) {
-              if (!file.delete()) {
-                logger.error("Was unable to delete spill file {}", 
file.getAbsolutePath());
-              }
-            }
+            handleFailedDelete();
           }
           try {
             Closeables.close(reader, /* swallowIOException = */ false);
@@ -307,13 +301,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
     public boolean hasNext() {
       if (numRecords == 0) {
         if (reader != null) {
-          // remove the spill file from disk
-          File file = spillWriters.removeFirst().getFile();
-          if (file != null && file.exists()) {
-            if (!file.delete()) {
-              logger.error("Was unable to delete spill file {}", 
file.getAbsolutePath());
-            }
-          }
+          handleFailedDelete();
         }
       }
       return numRecords > 0;
@@ -403,6 +391,14 @@ public final class BytesToBytesMap extends MemoryConsumer {
     public void remove() {
       throw new UnsupportedOperationException();
     }
+
+    private void handleFailedDelete() {
+      // remove the spill file from disk
+      File file = spillWriters.removeFirst().getFile();
+      if (file != null && file.exists() && !file.delete()) {
+        logger.error("Was unable to delete spill file {}", 
file.getAbsolutePath());
+      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7c92351f/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index de44640..39eda00 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -219,15 +219,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
         new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, 
writeMetrics,
           inMemSorter.numRecords());
       spillWriters.add(spillWriter);
-      final UnsafeSorterIterator sortedRecords = 
inMemSorter.getSortedIterator();
-      while (sortedRecords.hasNext()) {
-        sortedRecords.loadNext();
-        final Object baseObject = sortedRecords.getBaseObject();
-        final long baseOffset = sortedRecords.getBaseOffset();
-        final int recordLength = sortedRecords.getRecordLength();
-        spillWriter.write(baseObject, baseOffset, recordLength, 
sortedRecords.getKeyPrefix());
-      }
-      spillWriter.close();
+      spillIterator(inMemSorter.getSortedIterator(), spillWriter);
     }
 
     final long spillSize = freeMemory();
@@ -488,6 +480,18 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
     }
   }
 
+  private static void spillIterator(UnsafeSorterIterator inMemIterator,
+      UnsafeSorterSpillWriter spillWriter) throws IOException {
+    while (inMemIterator.hasNext()) {
+      inMemIterator.loadNext();
+      final Object baseObject = inMemIterator.getBaseObject();
+      final long baseOffset = inMemIterator.getBaseOffset();
+      final int recordLength = inMemIterator.getRecordLength();
+      spillWriter.write(baseObject, baseOffset, recordLength, 
inMemIterator.getKeyPrefix());
+    }
+    spillWriter.close();
+  }
+
   /**
    * An UnsafeSorterIterator that support spilling.
    */
@@ -503,6 +507,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
       this.numRecords = inMemIterator.getNumRecords();
     }
 
+    @Override
     public int getNumRecords() {
       return numRecords;
     }
@@ -521,14 +526,7 @@ public final class UnsafeExternalSorter extends 
MemoryConsumer {
         // Iterate over the records that have not been returned and spill them.
         final UnsafeSorterSpillWriter spillWriter =
           new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, 
writeMetrics, numRecords);
-        while (inMemIterator.hasNext()) {
-          inMemIterator.loadNext();
-          final Object baseObject = inMemIterator.getBaseObject();
-          final long baseOffset = inMemIterator.getBaseOffset();
-          final int recordLength = inMemIterator.getRecordLength();
-          spillWriter.write(baseObject, baseOffset, recordLength, 
inMemIterator.getKeyPrefix());
-        }
-        spillWriter.close();
+        spillIterator(inMemIterator, spillWriter);
         spillWriters.add(spillWriter);
         nextUpstream = spillWriter.getReader(serializerManager);
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7c92351f/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
index 4776617..5d9515c 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeRowWriter.java
@@ -109,22 +109,6 @@ public class UnsafeRowWriter {
     Platform.putLong(holder.buffer, fieldOffset, offsetAndSize);
   }
 
-  // Do word alignment for this row and grow the row buffer if needed.
-  // todo: remove this after we make unsafe array data word align.
-  public void alignToWords(int numBytes) {
-    final int remainder = numBytes & 0x07;
-
-    if (remainder > 0) {
-      final int paddingBytes = 8 - remainder;
-      holder.grow(paddingBytes);
-
-      for (int i = 0; i < paddingBytes; i++) {
-        Platform.putByte(holder.buffer, holder.cursor, (byte) 0);
-        holder.cursor++;
-      }
-    }
-  }
-
   public void write(int ordinal, boolean value) {
     final long offset = getFieldOffset(ordinal);
     Platform.putLong(holder.buffer, offset, 0L);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to