Repository: ignite Updated Branches: refs/heads/ignite-3235 [created] 7b3f4944d
IGNITE-3228: Hadoop: workaround/fix for inefficient memory usage. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2490b0ae Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2490b0ae Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2490b0ae Branch: refs/heads/ignite-3235 Commit: 2490b0aef17595247f8a6ae29482bc900e2a9a8d Parents: 52a2637 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Thu Jun 2 09:11:09 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Thu Jun 2 09:11:09 2016 +0300 ---------------------------------------------------------------------- .../shuffle/collections/HadoopMultimapBase.java | 90 +++++++++++++++++--- 1 file changed, 76 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2490b0ae/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java index e6995ca..7dcff3d 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/collections/HadoopMultimapBase.java @@ -30,7 +30,6 @@ import org.apache.ignite.internal.processors.hadoop.HadoopTaskContext; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataInStream; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopDataOutStream; import org.apache.ignite.internal.processors.hadoop.shuffle.streams.HadoopOffheapBuffer; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.jetbrains.annotations.Nullable; @@ -48,7 +47,7 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { protected final int pageSize; /** */ - private final Collection<GridLongList> allPages = new ConcurrentLinkedQueue<>(); + private final Collection<Page> allPages = new ConcurrentLinkedQueue<>(); /** * @param jobInfo Job info. @@ -64,11 +63,12 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { } /** - * @param ptrs Page pointers. + * @param page Page. */ - private void deallocate(GridLongList ptrs) { - while (!ptrs.isEmpty()) - mem.release(ptrs.remove(), ptrs.remove()); + private void deallocate(Page page) { + assert page != null; + + mem.release(page.ptr, page.size); } /** @@ -105,8 +105,8 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { /** {@inheritDoc} */ @Override public void close() { - for (GridLongList list : allPages) - deallocate(list); + for (Page page : allPages) + deallocate(page); } /** @@ -190,8 +190,8 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { /** */ private long writeStart; - /** Size and pointer pairs list. */ - private final GridLongList pages = new GridLongList(16); + /** Current page. */ + private Page curPage; /** * @param ctx Task context. @@ -222,11 +222,10 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { private long allocateNextPage(long requestedSize) { int writtenSize = writtenSize(); - long newPageSize = Math.max(writtenSize + requestedSize, pageSize); + long newPageSize = nextPageSize(writtenSize + requestedSize); long newPagePtr = mem.allocate(newPageSize); - pages.add(newPageSize); - pages.add(newPagePtr); + System.out.println("ALLOCATED: " + newPageSize); HadoopOffheapBuffer b = out.buffer(); @@ -240,10 +239,50 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { writeStart = newPagePtr; + // At this point old page is not needed, so we release it. + Page oldPage = curPage; + + curPage = new Page(newPagePtr, newPageSize); + + if (oldPage != null) + allPages.add(oldPage); + return b.move(requestedSize); } /** + * Get next page size. + * + * @param required Required amount of data. + * @return Next page size. + */ + private long nextPageSize(long required) { + long pages = (required / pageSize) + 1; + + long pagesPow2 = nextPowerOfTwo(pages); + + return pagesPow2 * pageSize; + } + + /** + * Get next power of two which greater or equal to the given number. Naive implementation. + * + * @param val Number + * @return Nearest pow2. + */ + private long nextPowerOfTwo(long val) { + long res = 1; + + while (res < val) + res = res << 1; + + if (res < 0) + throw new IllegalArgumentException("Value is too big to find positive pow2: " + val); + + return res; + } + + /** * @return Fixed pointer. */ private long fixAlignment() { @@ -317,7 +356,8 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { - allPages.add(pages); + if (curPage != null) + allPages.add(curPage); keySer.close(); valSer.close(); @@ -372,4 +412,26 @@ public abstract class HadoopMultimapBase implements HadoopMultimap { throw new UnsupportedOperationException(); } } + + /** + * Page. + */ + private static class Page { + /** Pointer. */ + private final long ptr; + + /** Size. */ + private final long size; + + /** + * Constructor. + * + * @param ptr Pointer. + * @param size Size. + */ + public Page(long ptr, long size) { + this.ptr = ptr; + this.size = size; + } + } } \ No newline at end of file