WIP.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7155ffbd
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7155ffbd
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7155ffbd

Branch: refs/heads/ignite-3228-1
Commit: 7155ffbdee0673b79e803d64ed8c5249e7c996e0
Parents: d94acbc
Author: thatcoach <[email protected]>
Authored: Wed Jun 1 19:54:29 2016 +0300
Committer: thatcoach <[email protected]>
Committed: Wed Jun 1 19:54:29 2016 +0300

----------------------------------------------------------------------
 .../shuffle/collections/HadoopMultimapBase.java | 36 ++++++++++++--------
 1 file changed, 22 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7155ffbd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
index c8fc539..2a56884 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java
@@ -25,7 +25,6 @@ import 
org.apache.ignite.internal.processors.hadoop.HadoopTaskContext;
 import 
org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream;
 import 
org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream;
 import 
org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopOffheapBuffer;
-import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.jetbrains.annotations.Nullable;
 
@@ -49,7 +48,7 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
     protected final int pageSize;
 
     /** */
-    private final Collection<GridLongList> allPages = new 
ConcurrentLinkedQueue<>();
+    private final Collection<Page> allPages = new ConcurrentLinkedQueue<>();
 
     /**
      * @param jobInfo Job info.
@@ -65,11 +64,12 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
     }
 
     /**
-     * @param ptrs Page pointers.
+     * @param page Page.
      */
-    private void deallocate(GridLongList ptrs) {
-        while (!ptrs.isEmpty())
-            mem.release(ptrs.remove(), ptrs.remove());
+    private void deallocate(Page page) {
+        assert page != null;
+
+        mem.release(page.ptr, page.size);
     }
 
     /**
@@ -106,8 +106,8 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
 
     /** {@inheritDoc} */
     @Override public void close() {
-        for (GridLongList list : allPages)
-            deallocate(list);
+        for (Page page : allPages)
+            deallocate(page);
     }
 
     /**
@@ -191,8 +191,8 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
         /** */
         private long writeStart;
 
-        /** Size and pointer pairs list. */
-        private final GridLongList pages = new GridLongList(16);
+        /** Current page. */
+        private Page curPage;
 
         /**
          * @param ctx Task context.
@@ -226,9 +226,6 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
             long newPageSize = ((writtenSize + requestedSize) % pageSize + 1) 
* pageSize;
             long newPagePtr = mem.allocate(newPageSize);
 
-            pages.add(newPageSize);
-            pages.add(newPagePtr);
-
             HadoopOffheapBuffer b = out.buffer();
 
             b.set(newPagePtr, newPageSize);
@@ -241,6 +238,16 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
 
             writeStart = newPagePtr;
 
+            // At this point old page is not needed, so we release it.
+            Page oldPage = curPage;
+
+            curPage = new Page(newPagePtr, newPageSize);
+
+            if (oldPage != null)
+                allPages.add(oldPage);
+            // TODO: Must deallocate at this point.
+//                deallocate(oldPage);
+
             return b.move(requestedSize);
         }
 
@@ -318,7 +325,8 @@ public abstract class HadoopMultimapBase implements 
HadoopMultimap {
 
         /** {@inheritDoc} */
         @Override public void close() throws IgniteCheckedException {
-            allPages.add(pages);
+            if (curPage != null)
+                allPages.add(curPage);
 
             keySer.close();
             valSer.close();

Reply via email to