TEZ-3244. Allow overlap of input and output memory when they are not concurrent. (jlowe)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/63ae97d5 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/63ae97d5 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/63ae97d5 Branch: refs/heads/TEZ-1190 Commit: 63ae97d5f3fe6e30e3c5f7c9a892ef9902e83b39 Parents: b3a3af3 Author: Jason Lowe <[email protected]> Authored: Tue Feb 7 13:32:37 2017 -0600 Committer: Jason Lowe <[email protected]> Committed: Tue Feb 7 13:32:37 2017 -0600 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/tez/dag/api/TezConfiguration.java | 30 ++++ .../common/resources/MemoryDistributor.java | 12 +- .../WeightedScalingMemoryDistributor.java | 62 ++++++- .../TestWeightedScalingMemoryDistributor.java | 165 +++++++++++++++++++ 5 files changed, 264 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 16e239f..a7cc0ce 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3244. Allow overlap of input and output memory when they are not concurrent TEZ-3581. Add different logger to enable suppressing logs for specific lines. TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels TEZ-3600. Fix flaky test: TestTokenCache @@ -197,6 +198,7 @@ INCOMPATIBLE CHANGES ALL CHANGES: + TEZ-3244. Allow overlap of input and output memory when they are not concurrent TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific levels TEZ-3462. Task attempt failure during container shutdown loses useful container diagnostics TEZ-3574. Container reuse won't pickup extra dag level local resource. http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index fd71b35..94f40bb 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -875,6 +875,36 @@ public class TezConfiguration extends Configuration { public static final String TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS = TEZ_TASK_PREFIX + "scale.memory.ratios"; + /** + * Concurrent input/output memory allocation control. When enabled memory + * distributions assume that inputs and outputs will use their memory + * simultaneously. When disabled the distributions assume that outputs are not + * initialized until inputs release memory buffers, allowing inputs to + * leverage memory normally set aside for outputs and vice-versa. + * NOTE: This property currently is not supported by the ScalingAllocator + * memory distributor. + */ + @Private + @Unstable + @ConfigurationScope(Scope.VERTEX) + public static final String TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT = + TEZ_TASK_PREFIX + "scale.memory.input-output-concurrent"; + public static final boolean TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT_DEFAULT = true; + + /** + * Controls distributing output memory to inputs when non-concurrent I/O + * memory allocation is being used. When enabled inputs will receive the + * same memory allocation as if concurrent I/O memory allocation were used. + * NOTE: This property currently is not supported by the ScalingAllocator + * memory distributor. + */ + @Private + @Unstable + @ConfigurationScope(Scope.VERTEX) + public static final String TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED = + TEZ_TASK_PREFIX + "scale.memory.non-concurrent-inputs.enabled"; + public static final boolean TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED_DEFAULT = false; + @Private @Unstable /** http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java index c822357..e63a414 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java @@ -61,6 +61,7 @@ public class MemoryDistributor { private long totalJvmMemory; private final boolean isEnabled; + private final boolean isInputOutputConcurrent; private final String allocatorClassName; private final Set<TaskContext> dupSet = Collections .newSetFromMap(new ConcurrentHashMap<TaskContext, Boolean>()); @@ -78,6 +79,9 @@ public class MemoryDistributor { this.conf = conf; isEnabled = conf.getBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED, TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED_DEFAULT); + isInputOutputConcurrent = conf.getBoolean( + TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT, + TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT_DEFAULT); if (isEnabled) { allocatorClassName = conf.get(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS, @@ -213,9 +217,11 @@ public class MemoryDistributor { Preconditions.checkState(numAllocations == numRequestors, "Number of allocations must match number of requestors. Allocated=" + numAllocations + ", Requests: " + numRequestors); - Preconditions.checkState(totalAllocated <= totalJvmMemory, - "Total allocation should be <= availableMem. TotalAllocated: " + totalAllocated - + ", totalJvmMemory: " + totalJvmMemory); + if (isInputOutputConcurrent) { + Preconditions.checkState(totalAllocated <= totalJvmMemory, + "Total allocation should be <= availableMem. TotalAllocated: " + totalAllocated + + ", totalJvmMemory: " + totalJvmMemory); + } } http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java index 8477300..c5b4fb0 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.common.resources.InitialMemoryAllocator; import org.apache.tez.runtime.common.resources.InitialMemoryRequestContext; +import org.apache.tez.runtime.common.resources.InitialMemoryRequestContext.ComponentType; import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy; import org.apache.tez.runtime.library.input.UnorderedKVInput; @@ -129,9 +130,15 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator + availableForAllocation + ", TotalRequested/TotalJVMHeap:" + new DecimalFormat("0.00").format(ratio)); + int numInputRequestsScaled = 0; + int numOutputRequestsScaled = 0; + long totalInputAllocated = 0; + long totalOutputAllocated = 0; + // Actual scaling List<Long> allocations = Lists.newArrayListWithCapacity(numRequests); for (Request request : requests) { + long allocated = 0; if (request.requestSize == 0) { allocations.add(0l); if (LOG.isDebugEnabled()) { @@ -141,7 +148,7 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator } else { double requestFactor = request.requestWeight / (double) numRequestsScaled; double scaledRequest = requestFactor * request.requestSize; - long allocated = Math.min( + allocated = Math.min( (long) ((scaledRequest / totalScaledRequest) * availableForAllocation), request.requestSize); // TODO Later - If requestedSize is used, the difference (allocated - @@ -152,9 +159,52 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator + request.requestType + " " + request.requestSize + " to allocated: " + allocated); } } + + if (request.componentType == ComponentType.INPUT) { + numInputRequestsScaled += request.requestWeight; + totalInputAllocated += allocated; + } else if (request.componentType == ComponentType.OUTPUT) { + numOutputRequestsScaled += request.requestWeight; + totalOutputAllocated += allocated; + } + } + + if (!conf.getBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT, + TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT_DEFAULT)) { + adjustAllocationsForNonConcurrent(allocations, requests, + numInputRequestsScaled, totalInputAllocated, + numOutputRequestsScaled, totalOutputAllocated); } + return allocations; + } + private void adjustAllocationsForNonConcurrent(List<Long> allocations, + List<Request> requests, int numInputsScaled, long totalInputAllocated, + int numOutputsScaled, long totalOutputAllocated) { + boolean inputsEnabled = conf.getBoolean( + TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED, + TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED_DEFAULT); + LOG.info("Adjusting scaled allocations for I/O non-concurrent." + + " numInputsScaled: {} InputAllocated: {} numOutputsScaled: {} outputAllocated: {} inputsEnabled: {}", + numInputsScaled, totalInputAllocated, numOutputsScaled, totalOutputAllocated, inputsEnabled); + for (int i = 0; i < requests.size(); i++) { + Request request = requests.get(i); + long additional = 0; + if (request.componentType == ComponentType.INPUT && inputsEnabled) { + double share = request.requestWeight / (double)numInputsScaled; + additional = (long) (totalOutputAllocated * share); + } else if (request.componentType == ComponentType.OUTPUT) { + double share = request.requestWeight / (double)numOutputsScaled; + additional = (long) (totalInputAllocated * share); + } + if (additional > 0) { + long newTotal = Math.min(allocations.get(i) + additional, request.requestSize); + // TODO Later - If requestedSize is used, the difference could be allocated to others. + allocations.set(i, newTotal); + LOG.debug("Adding {} to {} total={}", additional, request.componentClassname, newTotal); + } + } } private void initialProcessMemoryRequestContext(InitialMemoryRequestContext context) { @@ -164,9 +214,10 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator String className = context.getComponentClassName(); requestType = getRequestTypeForClass(className); Integer typeScaleFactor = getScaleFactorForType(requestType); + ComponentType componentType = context.getComponentType(); - Request request = new Request(context.getComponentClassName(), context.getRequestedSize(), - requestType, typeScaleFactor); + Request request = new Request(context.getComponentClassName(), componentType, + context.getRequestedSize(), requestType, typeScaleFactor); requests.add(request); numRequestsScaled += typeScaleFactor; } @@ -293,14 +344,17 @@ public class WeightedScalingMemoryDistributor implements InitialMemoryAllocator } private static class Request { - Request(String componentClassname, long requestSize, RequestType requestType, int requestWeight) { + Request(String componentClassname, ComponentType componentType, long requestSize, + RequestType requestType, int requestWeight) { this.componentClassname = componentClassname; + this.componentType = componentType; this.requestSize = requestSize; this.requestType = requestType; this.requestWeight = requestWeight; } String componentClassname; + ComponentType componentType; long requestSize; private RequestType requestType; private int requestWeight; http://git-wip-us.apache.org/repos/asf/tez/blob/63ae97d5/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java index a38497c..2fbe264 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java @@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock; import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.OutputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.runtime.api.LogicalInput; @@ -32,6 +33,7 @@ import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.InputContext; import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; import org.apache.tez.runtime.library.input.UnorderedKVInput; import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput; @@ -145,6 +147,169 @@ public class TestWeightedScalingMemoryDistributor extends TestMemoryDistributor assertEquals(1500, e4Callback.assigned); } + @Test(timeout = 5000) + public void testWeightedScalingNonConcurrent() throws TezException { + Configuration conf = new Configuration(this.conf); + conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT, false); + conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED, true); + conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, 0.2); + conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS, + WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 1, 2, 3, 1, 1)); + System.err.println(Joiner.on(",").join(conf.getStringCollection( + TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS))); + + MemoryDistributor dist = new MemoryDistributor(2, 2, conf); + + dist.setJvmMemory(10000l); + + // First request - ScatterGatherShuffleInput + MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest(); + InputContext e1InputContext1 = createTestInputContext(); + InputDescriptor e1InDesc1 = createTestInputDescriptor(OrderedGroupedKVInput.class); + dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1); + + // Second request - BroadcastInput + MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest(); + InputContext e2InputContext2 = createTestInputContext(); + InputDescriptor e2InDesc2 = createTestInputDescriptor(UnorderedKVInput.class); + dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2); + + // Third request - randomOutput (simulates MROutput) + MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest(); + OutputContext e3OutputContext1 = createTestOutputContext(); + OutputDescriptor e3OutDesc1 = createTestOutputDescriptor(); + dist.requestMemory(10000, e3Callback, e3OutputContext1, e3OutDesc1); + + // Fourth request - OnFileSortedOutput + MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest(); + OutputContext e4OutputContext2 = createTestOutputContext(); + OutputDescriptor e4OutDesc2 = createTestOutputDescriptor(OrderedPartitionedKVOutput.class); + dist.requestMemory(10000, e4Callback, e4OutputContext2, e4OutDesc2); + + // Fifth request - Processor + MemoryUpdateCallbackForTest e5Callback = new MemoryUpdateCallbackForTest(); + ProcessorContext e5ProcContext = createTestProcessortContext(); + ProcessorDescriptor e5ProcDesc = createTestProcessorDescriptor(); + dist.requestMemory(10000, e5Callback, e5ProcContext, e5ProcDesc); + + dist.makeInitialAllocations(); + + // Total available: 80% of 10K = 8000 + // 5 requests (weight) - 10K (3), 10K(1), 10K(1), 10K(2), 10K(1) + // Overlap input and output memory + assertEquals(5250, e1Callback.assigned); + assertEquals(1750, e2Callback.assigned); + assertEquals(2333, e3Callback.assigned); + assertEquals(4666, e4Callback.assigned); + assertEquals(1000, e5Callback.assigned); + } + + @Test(timeout = 5000) + public void testAdditionalReserveFractionWeightedScalingNonConcurrent() throws TezException { + Configuration conf = new Configuration(this.conf); + conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT, false); + conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED, true); + conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS, + WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 2, 3, 6, 1, 1)); + conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO, 0.025d); + conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX, 0.2d); + + MemoryDistributor dist = new MemoryDistributor(2, 2, conf); + + dist.setJvmMemory(10000l); + + // First request - ScatterGatherShuffleInput [weight 6] + MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest(); + InputContext e1InputContext1 = createTestInputContext(); + InputDescriptor e1InDesc1 = createTestInputDescriptor(OrderedGroupedKVInput.class); + dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1); + + // Second request - BroadcastInput [weight 2] + MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest(); + InputContext e2InputContext2 = createTestInputContext(); + InputDescriptor e2InDesc2 = createTestInputDescriptor(UnorderedKVInput.class); + dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2); + + // Third request - randomOutput (simulates MROutput) [weight 1] + MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest(); + OutputContext e3OutputContext1 = createTestOutputContext(); + OutputDescriptor e3OutDesc1 = createTestOutputDescriptor(); + dist.requestMemory(10000, e3Callback, e3OutputContext1, e3OutDesc1); + + // Fourth request - OnFileSortedOutput [weight 3] + MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest(); + OutputContext e4OutputContext2 = createTestOutputContext(); + OutputDescriptor e4OutDesc2 = createTestOutputDescriptor(OrderedPartitionedKVOutput.class); + dist.requestMemory(10000, e4Callback, e4OutputContext2, e4OutDesc2); + + dist.makeInitialAllocations(); + + // Total available: 60% of 10K = 6000 + // 4 requests (weight) - 10K (6), 10K(2), 10K(1), 10K(3) + // Overlap input and output memory + assertEquals(4500, e1Callback.assigned); + assertEquals(1500, e2Callback.assigned); + assertEquals(1500, e3Callback.assigned); + assertEquals(4500, e4Callback.assigned); + } + + @Test(timeout = 5000) + public void testWeightedScalingNonConcurrentInputsDisabled() throws TezException { + Configuration conf = new Configuration(this.conf); + conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT, false); + conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED, false); + conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, 0.2); + conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS, + WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 1, 2, 3, 1, 1)); + System.err.println(Joiner.on(",").join(conf.getStringCollection( + TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS))); + + MemoryDistributor dist = new MemoryDistributor(2, 2, conf); + + dist.setJvmMemory(10000l); + + // First request - ScatterGatherShuffleInput + MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest(); + InputContext e1InputContext1 = createTestInputContext(); + InputDescriptor e1InDesc1 = createTestInputDescriptor(OrderedGroupedKVInput.class); + dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1); + + // Second request - BroadcastInput + MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest(); + InputContext e2InputContext2 = createTestInputContext(); + InputDescriptor e2InDesc2 = createTestInputDescriptor(UnorderedKVInput.class); + dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2); + + // Third request - randomOutput (simulates MROutput) + MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest(); + OutputContext e3OutputContext1 = createTestOutputContext(); + OutputDescriptor e3OutDesc1 = createTestOutputDescriptor(); + dist.requestMemory(10000, e3Callback, e3OutputContext1, e3OutDesc1); + + // Fourth request - OnFileSortedOutput + MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest(); + OutputContext e4OutputContext2 = createTestOutputContext(); + OutputDescriptor e4OutDesc2 = createTestOutputDescriptor(OrderedPartitionedKVOutput.class); + dist.requestMemory(10000, e4Callback, e4OutputContext2, e4OutDesc2); + + // Fifth request - Processor + MemoryUpdateCallbackForTest e5Callback = new MemoryUpdateCallbackForTest(); + ProcessorContext e5ProcContext = createTestProcessortContext(); + ProcessorDescriptor e5ProcDesc = createTestProcessorDescriptor(); + dist.requestMemory(10000, e5Callback, e5ProcContext, e5ProcDesc); + + dist.makeInitialAllocations(); + + // Total available: 80% of 10K = 8000 + // 5 requests (weight) - 10K (3), 10K(1), 10K(1), 10K(2), 10K(1) + // Overlap input and output memory + assertEquals(3000, e1Callback.assigned); + assertEquals(1000, e2Callback.assigned); + assertEquals(2333, e3Callback.assigned); + assertEquals(4666, e4Callback.assigned); + assertEquals(1000, e5Callback.assigned); + } + private static class MemoryUpdateCallbackForTest extends MemoryUpdateCallback { long assigned = -1000;
