Repository: ignite Updated Branches: refs/heads/ignite-3228 [created] f661ab1ea
IGNITE-3228: Hadoop: fixed page allocation issue. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f661ab1e Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f661ab1e Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f661ab1e Branch: refs/heads/ignite-3228 Commit: f661ab1ea75a46906ed7c4748e3b04a62cdff049 Parents: 52a2637 Author: vozerov-gridgain <[email protected]> Authored: Wed Jun 1 18:06:55 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Jun 1 18:06:55 2016 +0300 ---------------------------------------------------------------------- .../shuffle/collections/HadoopMultimapBase.java | 58 +++++++++++++++----- 1 file changed, 43 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f661ab1e/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java index e6995ca..1bd7bc8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java @@ -30,7 +30,6 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopOffheapBuffer; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.jetbrains.annotations.Nullable; @@ -48,7 +47,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { protected final int pageSize; /** */ - private final Collection<GridLongList> allPages = new ConcurrentLinkedQueue<>(); + private final Collection<Page> allPages = new ConcurrentLinkedQueue<>(); /** * @param jobInfo Job info. @@ -64,11 +63,12 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { } /** - * @param ptrs Page pointers. + * @param page Page. */ - private void deallocate(GridLongList ptrs) { - while (!ptrs.isEmpty()) - mem.release(ptrs.remove(), ptrs.remove()); + private void deallocate(Page page) { + assert page != null; + + mem.release(page.ptr, page.size); } /** @@ -105,8 +105,8 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { /** {@inheritDoc} */ @Override public void close() { - for (GridLongList list : allPages) - deallocate(list); + for (Page page : allPages) + deallocate(page); } /** @@ -190,8 +190,8 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { /** */ private long writeStart; - /** Size and pointer pairs list. */ - private final GridLongList pages = new GridLongList(16); + /** Current page. */ + private Page curPage; /** * @param ctx Task context. @@ -222,12 +222,9 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { private long allocateNextPage(long requestedSize) { int writtenSize = writtenSize(); - long newPageSize = Math.max(writtenSize + requestedSize, pageSize); + long newPageSize = ((writtenSize + requestedSize) % pageSize + 1) * pageSize; long newPagePtr = mem.allocate(newPageSize); - pages.add(newPageSize); - pages.add(newPagePtr); - HadoopOffheapBuffer b = out.buffer(); b.set(newPagePtr, newPageSize); @@ -240,6 +237,14 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { writeStart = newPagePtr; + // At this point old page is not needed, so we release it. + Page oldPage = curPage; + + curPage = new Page(newPagePtr, newPageSize); + + if (oldPage != null) + deallocate(oldPage); + return b.move(requestedSize); } @@ -317,7 +322,8 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { - allPages.add(pages); + if (curPage != null) + allPages.add(curPage); keySer.close(); valSer.close(); @@ -372,4 +378,26 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { throw new UnsupportedOperationException(); } } + + /** + * Page. + */ + private static class Page { + /** Pointer. */ + private final long ptr; + + /** Size. */ + private final long size; + + /** + * Constructor. + * + * @param ptr Pointer. + * @param size Size. + */ + public Page(long ptr, long size) { + this.ptr = ptr; + this.size = size; + } + } } \ No newline at end of file
