Repository: ignite
Updated Branches:
  refs/heads/ignite-3228 [created] f661ab1ea


IGNITE-3228: Hadoop: fixed page allocation issue.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f661ab1e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f661ab1e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f661ab1e

Branch: refs/heads/ignite-3228
Commit: f661ab1ea75a46906ed7c4748e3b04a62cdff049
Parents: 52a2637
Author: vozerov-gridgain <[email protected]>
Authored: Wed Jun 1 18:06:55 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Wed Jun 1 18:06:55 2016 +0300

----------------------------------------------------------------------
 .../shuffle/collections/HadoopMultimapBase.java | 58 +++++++++++++++-----
 1 file changed, 43 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f661ab1e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
index e6995ca..1bd7bc8 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
@@ -30,7 +30,6 @@ import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import 
org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream;
 import 
org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream;
 import 
org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopOffheapBuffer;
-import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.jetbrains.annotations.Nullable;
 
@@ -48,7 +47,7 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
     protected final int pageSize;
 
     /** */
-    private final Collection<GridLongList> allPages = new 
ConcurrentLinkedQueue<>();
+    private final Collection<Page> allPages = new ConcurrentLinkedQueue<>();
 
     /**
      * @param jobInfo Job info.
@@ -64,11 +63,12 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
     }
 
     /**
-     * @param ptrs Page pointers.
+     * @param page Page.
      */
-    private void deallocate(GridLongList ptrs) {
-        while (!ptrs.isEmpty())
-            mem.release(ptrs.remove(), ptrs.remove());
+    private void deallocate(Page page) {
+        assert page != null;
+
+        mem.release(page.ptr, page.size);
     }
 
     /**
@@ -105,8 +105,8 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
 
     /** {@inheritDoc} */
     @Override public void close() {
-        for (GridLongList list : allPages)
-            deallocate(list);
+        for (Page page : allPages)
+            deallocate(page);
     }
 
     /**
@@ -190,8 +190,8 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
         /** */
         private long writeStart;
 
-        /** Size and pointer pairs list. */
-        private final GridLongList pages = new GridLongList(16);
+        /** Current page. */
+        private Page curPage;
 
         /**
          * @param ctx Task context.
@@ -222,12 +222,9 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
         private long allocateNextPage(long requestedSize) {
             int writtenSize = writtenSize();
 
-            long newPageSize = Math.max(writtenSize + requestedSize, pageSize);
+            long newPageSize = ((writtenSize + requestedSize) % pageSize + 1) 
* pageSize;
             long newPagePtr = mem.allocate(newPageSize);
 
-            pages.add(newPageSize);
-            pages.add(newPagePtr);
-
             HadoopOffheapBuffer b = out.buffer();
 
             b.set(newPagePtr, newPageSize);
@@ -240,6 +237,14 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
 
             writeStart = newPagePtr;
 
+            // At this point old page is not needed, so we release it.
+            Page oldPage = curPage;
+
+            curPage = new Page(newPagePtr, newPageSize);
+
+            if (oldPage != null)
+                deallocate(oldPage);
+
             return b.move(requestedSize);
         }
 
@@ -317,7 +322,8 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
 
         /** {@inheritDoc} */
         @Override public void close() throws IgniteCheckedException {
-            allPages.add(pages);
+            if (curPage != null)
+                allPages.add(curPage);
 
             keySer.close();
             valSer.close();
@@ -372,4 +378,26 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
             throw new UnsupportedOperationException();
         }
     }
+
+    /**
+     * Page.
+     */
+    private static class Page {
+        /** Pointer. */
+        private final long ptr;
+
+        /** Size. */
+        private final long size;
+
+        /**
+         * Constructor.
+         *
+         * @param ptr Pointer.
+         * @param size Size.
+         */
+        public Page(long ptr, long size) {
+            this.ptr = ptr;
+            this.size = size;
+        }
+    }
 }
\ No newline at end of file

Reply via email to