Repository: tez
Updated Branches:
refs/heads/branch-0.8 26d179f8c -> ec89a6332
TEZ-3244. Allow overlap of input and output memory when they are not
concurrent. (jlowe)
(cherry picked from commit 63ae97d5f3fe6e30e3c5f7c9a892ef9902e83b39)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ec89a633
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ec89a633
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ec89a633
Branch: refs/heads/branch-0.8
Commit: ec89a63329b9355bc7a0896c7ba5eb0dfd7bddae
Parents: 26d179f
Author: Jason Lowe <[email protected]>
Authored: Tue Feb 7 13:32:37 2017 -0600
Committer: Jason Lowe <[email protected]>
Committed: Tue Feb 7 13:37:05 2017 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/api/TezConfiguration.java | 30 ++++
.../common/resources/MemoryDistributor.java | 12 +-
.../WeightedScalingMemoryDistributor.java | 62 ++++++-
.../TestWeightedScalingMemoryDistributor.java | 165 +++++++++++++++++++
5 files changed, 263 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/ec89a633/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 55fdfc9..aa18dcc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-3244. Allow overlap of input and output memory when they are not
concurrent
TEZ-3601. Add another HistoryLogLevel to suppress TaskAttempts at specific
levels
TEZ-3582. Exception swallowed in PipelinedSorter causing incorrect results.
TEZ-3462. Task attempt failure during container shutdown loses useful
container diagnostics
http://git-wip-us.apache.org/repos/asf/tez/blob/ec89a633/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 6144399..571fd1b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -875,6 +875,36 @@ public class TezConfiguration extends Configuration {
public static final String TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS =
TEZ_TASK_PREFIX + "scale.memory.ratios";
+ /**
+ * Concurrent input/output memory allocation control. When enabled memory
+ * distributions assume that inputs and outputs will use their memory
+ * simultaneously. When disabled the distributions assume that outputs are
not
+ * initialized until inputs release memory buffers, allowing inputs to
+ * leverage memory normally set aside for outputs and vice-versa.
+ * NOTE: This property currently is not supported by the ScalingAllocator
+ * memory distributor.
+ */
+ @Private
+ @Unstable
+ @ConfigurationScope(Scope.VERTEX)
+ public static final String TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT =
+ TEZ_TASK_PREFIX + "scale.memory.input-output-concurrent";
+ public static final boolean
TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT_DEFAULT = true;
+
+ /**
+ * Controls distributing output memory to inputs when non-concurrent I/O
+ * memory allocation is being used. When enabled inputs will receive the
+ * same memory allocation as if concurrent I/O memory allocation were used.
+ * NOTE: This property currently is not supported by the ScalingAllocator
+ * memory distributor.
+ */
+ @Private
+ @Unstable
+ @ConfigurationScope(Scope.VERTEX)
+ public static final String
TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED =
+ TEZ_TASK_PREFIX + "scale.memory.non-concurrent-inputs.enabled";
+ public static final boolean
TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED_DEFAULT = false;
+
@Private
@Unstable
/**
http://git-wip-us.apache.org/repos/asf/tez/blob/ec89a633/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
index c822357..e63a414 100644
---
a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
+++
b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/common/resources/MemoryDistributor.java
@@ -61,6 +61,7 @@ public class MemoryDistributor {
private long totalJvmMemory;
private final boolean isEnabled;
+ private final boolean isInputOutputConcurrent;
private final String allocatorClassName;
private final Set<TaskContext> dupSet = Collections
.newSetFromMap(new ConcurrentHashMap<TaskContext, Boolean>());
@@ -78,6 +79,9 @@ public class MemoryDistributor {
this.conf = conf;
isEnabled = conf.getBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED,
TezConfiguration.TEZ_TASK_SCALE_MEMORY_ENABLED_DEFAULT);
+ isInputOutputConcurrent = conf.getBoolean(
+ TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT,
+
TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT_DEFAULT);
if (isEnabled) {
allocatorClassName =
conf.get(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
@@ -213,9 +217,11 @@ public class MemoryDistributor {
Preconditions.checkState(numAllocations == numRequestors,
"Number of allocations must match number of requestors. Allocated=" +
numAllocations
+ ", Requests: " + numRequestors);
- Preconditions.checkState(totalAllocated <= totalJvmMemory,
- "Total allocation should be <= availableMem. TotalAllocated: " +
totalAllocated
- + ", totalJvmMemory: " + totalJvmMemory);
+ if (isInputOutputConcurrent) {
+ Preconditions.checkState(totalAllocated <= totalJvmMemory,
+ "Total allocation should be <= availableMem. TotalAllocated: " +
totalAllocated
+ + ", totalJvmMemory: " + totalJvmMemory);
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/ec89a633/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
index 8477300..c5b4fb0 100644
---
a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
+++
b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/resources/WeightedScalingMemoryDistributor.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.common.resources.InitialMemoryAllocator;
import org.apache.tez.runtime.common.resources.InitialMemoryRequestContext;
+import
org.apache.tez.runtime.common.resources.InitialMemoryRequestContext.ComponentType;
import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
import org.apache.tez.runtime.library.input.OrderedGroupedInputLegacy;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
@@ -129,9 +130,15 @@ public class WeightedScalingMemoryDistributor implements
InitialMemoryAllocator
+ availableForAllocation + ", TotalRequested/TotalJVMHeap:"
+ new DecimalFormat("0.00").format(ratio));
+ int numInputRequestsScaled = 0;
+ int numOutputRequestsScaled = 0;
+ long totalInputAllocated = 0;
+ long totalOutputAllocated = 0;
+
// Actual scaling
List<Long> allocations = Lists.newArrayListWithCapacity(numRequests);
for (Request request : requests) {
+ long allocated = 0;
if (request.requestSize == 0) {
allocations.add(0l);
if (LOG.isDebugEnabled()) {
@@ -141,7 +148,7 @@ public class WeightedScalingMemoryDistributor implements
InitialMemoryAllocator
} else {
double requestFactor = request.requestWeight / (double)
numRequestsScaled;
double scaledRequest = requestFactor * request.requestSize;
- long allocated = Math.min(
+ allocated = Math.min(
(long) ((scaledRequest / totalScaledRequest) *
availableForAllocation),
request.requestSize);
// TODO Later - If requestedSize is used, the difference (allocated -
@@ -152,9 +159,52 @@ public class WeightedScalingMemoryDistributor implements
InitialMemoryAllocator
+ request.requestType + " " + request.requestSize + " to
allocated: " + allocated);
}
}
+
+ if (request.componentType == ComponentType.INPUT) {
+ numInputRequestsScaled += request.requestWeight;
+ totalInputAllocated += allocated;
+ } else if (request.componentType == ComponentType.OUTPUT) {
+ numOutputRequestsScaled += request.requestWeight;
+ totalOutputAllocated += allocated;
+ }
+ }
+
+ if
(!conf.getBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT,
+
TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT_DEFAULT)) {
+ adjustAllocationsForNonConcurrent(allocations, requests,
+ numInputRequestsScaled, totalInputAllocated,
+ numOutputRequestsScaled, totalOutputAllocated);
}
+
return allocations;
+ }
+ private void adjustAllocationsForNonConcurrent(List<Long> allocations,
+ List<Request> requests, int numInputsScaled, long totalInputAllocated,
+ int numOutputsScaled, long totalOutputAllocated) {
+ boolean inputsEnabled = conf.getBoolean(
+ TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED,
+
TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED_DEFAULT);
+ LOG.info("Adjusting scaled allocations for I/O non-concurrent."
+ + " numInputsScaled: {} InputAllocated: {} numOutputsScaled: {}
outputAllocated: {} inputsEnabled: {}",
+ numInputsScaled, totalInputAllocated, numOutputsScaled,
totalOutputAllocated, inputsEnabled);
+ for (int i = 0; i < requests.size(); i++) {
+ Request request = requests.get(i);
+ long additional = 0;
+ if (request.componentType == ComponentType.INPUT && inputsEnabled) {
+ double share = request.requestWeight / (double)numInputsScaled;
+ additional = (long) (totalOutputAllocated * share);
+ } else if (request.componentType == ComponentType.OUTPUT) {
+ double share = request.requestWeight / (double)numOutputsScaled;
+ additional = (long) (totalInputAllocated * share);
+ }
+ if (additional > 0) {
+ long newTotal = Math.min(allocations.get(i) + additional,
request.requestSize);
+ // TODO Later - If requestedSize is used, the difference could be
allocated to others.
+ allocations.set(i, newTotal);
+ LOG.debug("Adding {} to {} total={}", additional,
request.componentClassname, newTotal);
+ }
+ }
}
private void initialProcessMemoryRequestContext(InitialMemoryRequestContext
context) {
@@ -164,9 +214,10 @@ public class WeightedScalingMemoryDistributor implements
InitialMemoryAllocator
String className = context.getComponentClassName();
requestType = getRequestTypeForClass(className);
Integer typeScaleFactor = getScaleFactorForType(requestType);
+ ComponentType componentType = context.getComponentType();
- Request request = new Request(context.getComponentClassName(),
context.getRequestedSize(),
- requestType, typeScaleFactor);
+ Request request = new Request(context.getComponentClassName(),
componentType,
+ context.getRequestedSize(), requestType, typeScaleFactor);
requests.add(request);
numRequestsScaled += typeScaleFactor;
}
@@ -293,14 +344,17 @@ public class WeightedScalingMemoryDistributor implements
InitialMemoryAllocator
}
private static class Request {
- Request(String componentClassname, long requestSize, RequestType
requestType, int requestWeight) {
+ Request(String componentClassname, ComponentType componentType, long
requestSize,
+ RequestType requestType, int requestWeight) {
this.componentClassname = componentClassname;
+ this.componentType = componentType;
this.requestSize = requestSize;
this.requestType = requestType;
this.requestWeight = requestWeight;
}
String componentClassname;
+ ComponentType componentType;
long requestSize;
private RequestType requestType;
private int requestWeight;
http://git-wip-us.apache.org/repos/asf/tez/blob/ec89a633/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
----------------------------------------------------------------------
diff --git
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
index a38497c..2fbe264 100644
---
a/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
+++
b/tez-runtime-library/src/test/java/org/apache/tez/runtime/common/resources/TestWeightedScalingMemoryDistributor.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.mock;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.runtime.api.LogicalInput;
@@ -32,6 +33,7 @@ import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.MemoryUpdateCallback;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
import org.apache.tez.runtime.library.output.OrderedPartitionedKVOutput;
@@ -145,6 +147,169 @@ public class TestWeightedScalingMemoryDistributor extends
TestMemoryDistributor
assertEquals(1500, e4Callback.assigned);
}
+ @Test(timeout = 5000)
+ public void testWeightedScalingNonConcurrent() throws TezException {
+ Configuration conf = new Configuration(this.conf);
+
conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT,
false);
+
conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED,
true);
+ conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION,
0.2);
+ conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
+ WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 1, 2, 3,
1, 1));
+ System.err.println(Joiner.on(",").join(conf.getStringCollection(
+ TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS)));
+
+ MemoryDistributor dist = new MemoryDistributor(2, 2, conf);
+
+ dist.setJvmMemory(10000l);
+
+ // First request - ScatterGatherShuffleInput
+ MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+ InputContext e1InputContext1 = createTestInputContext();
+ InputDescriptor e1InDesc1 =
createTestInputDescriptor(OrderedGroupedKVInput.class);
+ dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1);
+
+ // Second request - BroadcastInput
+ MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+ InputContext e2InputContext2 = createTestInputContext();
+ InputDescriptor e2InDesc2 =
createTestInputDescriptor(UnorderedKVInput.class);
+ dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2);
+
+ // Third request - randomOutput (simulates MROutput)
+ MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest();
+ OutputContext e3OutputContext1 = createTestOutputContext();
+ OutputDescriptor e3OutDesc1 = createTestOutputDescriptor();
+ dist.requestMemory(10000, e3Callback, e3OutputContext1, e3OutDesc1);
+
+ // Fourth request - OnFileSortedOutput
+ MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest();
+ OutputContext e4OutputContext2 = createTestOutputContext();
+ OutputDescriptor e4OutDesc2 =
createTestOutputDescriptor(OrderedPartitionedKVOutput.class);
+ dist.requestMemory(10000, e4Callback, e4OutputContext2, e4OutDesc2);
+
+ // Fifth request - Processor
+ MemoryUpdateCallbackForTest e5Callback = new MemoryUpdateCallbackForTest();
+ ProcessorContext e5ProcContext = createTestProcessortContext();
+ ProcessorDescriptor e5ProcDesc = createTestProcessorDescriptor();
+ dist.requestMemory(10000, e5Callback, e5ProcContext, e5ProcDesc);
+
+ dist.makeInitialAllocations();
+
+ // Total available: 80% of 10K = 8000
+ // 5 requests (weight) - 10K (3), 10K(1), 10K(1), 10K(2), 10K(1)
+ // Overlap input and output memory
+ assertEquals(5250, e1Callback.assigned);
+ assertEquals(1750, e2Callback.assigned);
+ assertEquals(2333, e3Callback.assigned);
+ assertEquals(4666, e4Callback.assigned);
+ assertEquals(1000, e5Callback.assigned);
+ }
+
+ @Test(timeout = 5000)
+ public void testAdditionalReserveFractionWeightedScalingNonConcurrent()
throws TezException {
+ Configuration conf = new Configuration(this.conf);
+
conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT,
false);
+
conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED,
true);
+ conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
+ WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 2, 3, 6,
1, 1));
+
conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_PER_IO,
0.025d);
+
conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ADDITIONAL_RESERVATION_FRACTION_MAX,
0.2d);
+
+ MemoryDistributor dist = new MemoryDistributor(2, 2, conf);
+
+ dist.setJvmMemory(10000l);
+
+ // First request - ScatterGatherShuffleInput [weight 6]
+ MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+ InputContext e1InputContext1 = createTestInputContext();
+ InputDescriptor e1InDesc1 =
createTestInputDescriptor(OrderedGroupedKVInput.class);
+ dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1);
+
+ // Second request - BroadcastInput [weight 2]
+ MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+ InputContext e2InputContext2 = createTestInputContext();
+ InputDescriptor e2InDesc2 =
createTestInputDescriptor(UnorderedKVInput.class);
+ dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2);
+
+ // Third request - randomOutput (simulates MROutput) [weight 1]
+ MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest();
+ OutputContext e3OutputContext1 = createTestOutputContext();
+ OutputDescriptor e3OutDesc1 = createTestOutputDescriptor();
+ dist.requestMemory(10000, e3Callback, e3OutputContext1, e3OutDesc1);
+
+ // Fourth request - OnFileSortedOutput [weight 3]
+ MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest();
+ OutputContext e4OutputContext2 = createTestOutputContext();
+ OutputDescriptor e4OutDesc2 =
createTestOutputDescriptor(OrderedPartitionedKVOutput.class);
+ dist.requestMemory(10000, e4Callback, e4OutputContext2, e4OutDesc2);
+
+ dist.makeInitialAllocations();
+
+ // Total available: 60% of 10K = 6000
+ // 4 requests (weight) - 10K (6), 10K(2), 10K(1), 10K(3)
+ // Overlap input and output memory
+ assertEquals(4500, e1Callback.assigned);
+ assertEquals(1500, e2Callback.assigned);
+ assertEquals(1500, e3Callback.assigned);
+ assertEquals(4500, e4Callback.assigned);
+ }
+
+ @Test(timeout = 5000)
+ public void testWeightedScalingNonConcurrentInputsDisabled() throws
TezException {
+ Configuration conf = new Configuration(this.conf);
+
conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_INPUT_OUTPUT_CONCURRENT,
false);
+
conf.setBoolean(TezConfiguration.TEZ_TASK_SCALE_MEMORY_NON_CONCURRENT_INPUTS_ENABLED,
false);
+ conf.setDouble(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION,
0.2);
+ conf.setStrings(TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS,
+ WeightedScalingMemoryDistributor.generateWeightStrings(0, 0, 1, 2, 3,
1, 1));
+ System.err.println(Joiner.on(",").join(conf.getStringCollection(
+ TezConfiguration.TEZ_TASK_SCALE_MEMORY_WEIGHTED_RATIOS)));
+
+ MemoryDistributor dist = new MemoryDistributor(2, 2, conf);
+
+ dist.setJvmMemory(10000l);
+
+ // First request - ScatterGatherShuffleInput
+ MemoryUpdateCallbackForTest e1Callback = new MemoryUpdateCallbackForTest();
+ InputContext e1InputContext1 = createTestInputContext();
+ InputDescriptor e1InDesc1 =
createTestInputDescriptor(OrderedGroupedKVInput.class);
+ dist.requestMemory(10000, e1Callback, e1InputContext1, e1InDesc1);
+
+ // Second request - BroadcastInput
+ MemoryUpdateCallbackForTest e2Callback = new MemoryUpdateCallbackForTest();
+ InputContext e2InputContext2 = createTestInputContext();
+ InputDescriptor e2InDesc2 =
createTestInputDescriptor(UnorderedKVInput.class);
+ dist.requestMemory(10000, e2Callback, e2InputContext2, e2InDesc2);
+
+ // Third request - randomOutput (simulates MROutput)
+ MemoryUpdateCallbackForTest e3Callback = new MemoryUpdateCallbackForTest();
+ OutputContext e3OutputContext1 = createTestOutputContext();
+ OutputDescriptor e3OutDesc1 = createTestOutputDescriptor();
+ dist.requestMemory(10000, e3Callback, e3OutputContext1, e3OutDesc1);
+
+ // Fourth request - OnFileSortedOutput
+ MemoryUpdateCallbackForTest e4Callback = new MemoryUpdateCallbackForTest();
+ OutputContext e4OutputContext2 = createTestOutputContext();
+ OutputDescriptor e4OutDesc2 =
createTestOutputDescriptor(OrderedPartitionedKVOutput.class);
+ dist.requestMemory(10000, e4Callback, e4OutputContext2, e4OutDesc2);
+
+ // Fifth request - Processor
+ MemoryUpdateCallbackForTest e5Callback = new MemoryUpdateCallbackForTest();
+ ProcessorContext e5ProcContext = createTestProcessortContext();
+ ProcessorDescriptor e5ProcDesc = createTestProcessorDescriptor();
+ dist.requestMemory(10000, e5Callback, e5ProcContext, e5ProcDesc);
+
+ dist.makeInitialAllocations();
+
+ // Total available: 80% of 10K = 8000
+ // 5 requests (weight) - 10K (3), 10K(1), 10K(1), 10K(2), 10K(1)
+ // Overlap input and output memory
+ assertEquals(3000, e1Callback.assigned);
+ assertEquals(1000, e2Callback.assigned);
+ assertEquals(2333, e3Callback.assigned);
+ assertEquals(4666, e4Callback.assigned);
+ assertEquals(1000, e5Callback.assigned);
+ }
+
private static class MemoryUpdateCallbackForTest extends
MemoryUpdateCallback {
long assigned = -1000;