Repository: ignite
Updated Branches:
  refs/heads/ignite-3235 [created] 7b3f4944d


IGNITE-3228: Hadoop: workaround/fix for inefficient memory usage.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2490b0ae
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2490b0ae
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2490b0ae

Branch: refs/heads/ignite-3235
Commit: 2490b0aef17595247f8a6ae29482bc900e2a9a8d
Parents: 52a2637
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Thu Jun 2 09:11:09 2016 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Thu Jun 2 09:11:09 2016 +0300

----------------------------------------------------------------------
 .../shuffle/collections/HadoopMultimapBase.java | 90 +++++++++++++++++---
 1 file changed, 76 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2490b0ae/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
index e6995ca..7dcff3d 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
@@ -30,7 +30,6 @@ import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import 
org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream;
 import 
org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream;
 import 
org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopOffheapBuffer;
-import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.jetbrains.annotations.Nullable;
 
@@ -48,7 +47,7 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
     protected final int pageSize;
 
     /** */
-    private final Collection<GridLongList> allPages = new 
ConcurrentLinkedQueue<>();
+    private final Collection<Page> allPages = new ConcurrentLinkedQueue<>();
 
     /**
      * @param jobInfo Job info.
@@ -64,11 +63,12 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
     }
 
     /**
-     * @param ptrs Page pointers.
+     * @param page Page.
      */
-    private void deallocate(GridLongList ptrs) {
-        while (!ptrs.isEmpty())
-            mem.release(ptrs.remove(), ptrs.remove());
+    private void deallocate(Page page) {
+        assert page != null;
+
+        mem.release(page.ptr, page.size);
     }
 
     /**
@@ -105,8 +105,8 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
 
     /** {@inheritDoc} */
     @Override public void close() {
-        for (GridLongList list : allPages)
-            deallocate(list);
+        for (Page page : allPages)
+            deallocate(page);
     }
 
     /**
@@ -190,8 +190,8 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
         /** */
         private long writeStart;
 
-        /** Size and pointer pairs list. */
-        private final GridLongList pages = new GridLongList(16);
+        /** Current page. */
+        private Page curPage;
 
         /**
          * @param ctx Task context.
@@ -222,11 +222,10 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
         private long allocateNextPage(long requestedSize) {
             int writtenSize = writtenSize();
 
-            long newPageSize = Math.max(writtenSize + requestedSize, pageSize);
+            long newPageSize = nextPageSize(writtenSize + requestedSize);
             long newPagePtr = mem.allocate(newPageSize);
 
-            pages.add(newPageSize);
-            pages.add(newPagePtr);
+            System.out.println("ALLOCATED: " + newPageSize);
 
             HadoopOffheapBuffer b = out.buffer();
 
@@ -240,10 +239,50 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
 
             writeStart = newPagePtr;
 
+            // At this point old page is not needed, so we release it.
+            Page oldPage = curPage;
+
+            curPage = new Page(newPagePtr, newPageSize);
+
+            if (oldPage != null)
+                allPages.add(oldPage);
+
             return b.move(requestedSize);
         }
 
         /**
+         * Get next page size.
+         *
+         * @param required Required amount of data.
+         * @return Next page size.
+         */
+        private long nextPageSize(long required) {
+            long pages = (required / pageSize) + 1;
+
+            long pagesPow2 = nextPowerOfTwo(pages);
+
+            return pagesPow2 * pageSize;
+        }
+
+        /**
+         * Get next power of two which greater or equal to the given number. 
Naive implementation.
+         *
+         * @param val Number
+         * @return Nearest pow2.
+         */
+        private long nextPowerOfTwo(long val) {
+            long res = 1;
+
+            while (res < val)
+                res = res << 1;
+
+            if (res < 0)
+                throw new IllegalArgumentException("Value is too big to find 
positive pow2: " + val);
+
+            return res;
+        }
+
+        /**
          * @return Fixed pointer.
          */
         private long fixAlignment() {
@@ -317,7 +356,8 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
 
         /** {@inheritDoc} */
         @Override public void close() throws IgniteCheckedException {
-            allPages.add(pages);
+            if (curPage != null)
+                allPages.add(curPage);
 
             keySer.close();
             valSer.close();
@@ -372,4 +412,26 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
             throw new UnsupportedOperationException();
         }
     }
+
+    /**
+     * Page.
+     */
+    private static class Page {
+        /** Pointer. */
+        private final long ptr;
+
+        /** Size. */
+        private final long size;
+
+        /**
+         * Constructor.
+         *
+         * @param ptr Pointer.
+         * @param size Size.
+         */
+        public Page(long ptr, long size) {
+            this.ptr = ptr;
+            this.size = size;
+        }
+    }
 }
\ No newline at end of file

Reply via email to