WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7155ffbd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7155ffbd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7155ffbd Branch: refs/heads/ignite-3228-1 Commit: 7155ffbdee0673b79e803d64ed8c5249e7c996e0 Parents: d94acbc Author: thatcoach <[email protected]> Authored: Wed Jun 1 19:54:29 2016 +0300 Committer: thatcoach <[email protected]> Committed: Wed Jun 1 19:54:29 2016 +0300 ---------------------------------------------------------------------- .../shuffle/collections/HadoopMultimapBase.java | 36 ++++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7155ffbd/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java index c8fc539..2a56884 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopOffheapBuffer; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.jetbrains.annotations.Nullable; @@ -49,7 +48,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { protected final int pageSize; /** */ - private final Collection<GridLongList> allPages = new ConcurrentLinkedQueue<>(); + private final Collection<Page> allPages = new ConcurrentLinkedQueue<>(); /** * @param jobInfo Job info. @@ -65,11 +64,12 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { } /** - * @param ptrs Page pointers. + * @param page Page. */ - private void deallocate(GridLongList ptrs) { - while (!ptrs.isEmpty()) - mem.release(ptrs.remove(), ptrs.remove()); + private void deallocate(Page page) { + assert page != null; + + mem.release(page.ptr, page.size); } /** @@ -106,8 +106,8 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { /** {@inheritDoc} */ @Override public void close() { - for (GridLongList list : allPages) - deallocate(list); + for (Page page : allPages) + deallocate(page); } /** @@ -191,8 +191,8 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { /** */ private long writeStart; - /** Size and pointer pairs list. */ - private final GridLongList pages = new GridLongList(16); + /** Current page. */ + private Page curPage; /** * @param ctx Task context. @@ -226,9 +226,6 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { long newPageSize = ((writtenSize + requestedSize) % pageSize + 1) * pageSize; long newPagePtr = mem.allocate(newPageSize); - pages.add(newPageSize); - pages.add(newPagePtr); - HadoopOffheapBuffer b = out.buffer(); b.set(newPagePtr, newPageSize); @@ -241,6 +238,16 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { writeStart = newPagePtr; + // At this point old page is not needed, so we release it. + Page oldPage = curPage; + + curPage = new Page(newPagePtr, newPageSize); + + if (oldPage != null) + allPages.add(oldPage); + // TODO: Must deallocate at this point. +// deallocate(oldPage); + return b.move(requestedSize); } @@ -318,7 +325,8 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { - allPages.add(pages); + if (curPage != null) + allPages.add(curPage); keySer.close(); valSer.close();
