Repository: tez Updated Branches: refs/heads/master 770e3058a -> 2213c109e
http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java index a72dafa..5511b72 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java @@ -44,6 +44,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; import org.apache.tez.common.ReflectionUtils; import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.dag.records.TezTaskAttemptID; @@ -52,7 +53,6 @@ import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.InputFrameworkInterface; import org.apache.tez.runtime.api.LogicalIOProcessor; import org.apache.tez.runtime.api.LogicalInput; -import org.apache.tez.runtime.api.LogicalInputFrameworkInterface; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.LogicalOutputFrameworkInterface; import org.apache.tez.runtime.api.MergedLogicalInput; @@ -60,6 +60,7 @@ import org.apache.tez.runtime.api.Output; import org.apache.tez.runtime.api.OutputFrameworkInterface; import org.apache.tez.runtime.api.Processor; import org.apache.tez.runtime.api.TezInputContext; +import org.apache.tez.runtime.api.TezMergedInputContext; import org.apache.tez.runtime.api.TezOutputContext; import org.apache.tez.runtime.api.TezProcessorContext; import org.apache.tez.runtime.api.impl.EventMetaData; @@ -368,17 +369,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { public Void call() throws Exception { LOG.info("Initializing Input using InputSpec: " + inputSpec); String edgeName = inputSpec.getSourceVertexName(); - LogicalInput input = createInput(inputSpec); - TezInputContext inputContext = createInputContext(input, inputSpec, inputIndex); + TezInputContext inputContext = createInputContext(inputsMap, inputSpec, inputIndex); + LogicalInput input = createInput(inputSpec, inputContext); + inputsMap.put(edgeName, input); inputContextMap.put(edgeName, inputContext); - if (input instanceof LogicalInputFrameworkInterface) { - ((LogicalInputFrameworkInterface) input).setNumPhysicalInputs(inputSpec - .getPhysicalEdgeCount()); - } LOG.info("Initializing Input with src edge: " + edgeName); - List<Event> events = ((InputFrameworkInterface)input).initialize(inputContext); + List<Event> events = ((InputFrameworkInterface)input).initialize(); sendTaskGeneratedEvents(events, EventProducerConsumerType.INPUT, inputContext.getTaskVertexName(), inputContext.getSourceVertexName(), taskSpec.getTaskAttemptID()); @@ -419,17 +417,14 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { public Void call() throws Exception { LOG.info("Initializing Output using OutputSpec: " + outputSpec); String edgeName = outputSpec.getDestinationVertexName(); - LogicalOutput output = createOutput(outputSpec); TezOutputContext outputContext = createOutputContext(outputSpec, outputIndex); + LogicalOutput output = createOutput(outputSpec, outputContext); + outputsMap.put(edgeName, output); outputContextMap.put(edgeName, outputContext); - if (output instanceof LogicalOutputFrameworkInterface) { - ((LogicalOutputFrameworkInterface) output).setNumPhysicalOutputs(outputSpec - .getPhysicalEdgeCount()); - } LOG.info("Initializing Output with dest edge: " + edgeName); - List<Event> events = ((OutputFrameworkInterface)output).initialize(outputContext); + List<Event> events = ((OutputFrameworkInterface)output).initialize(); sendTaskGeneratedEvents(events, EventProducerConsumerType.OUTPUT, outputContext.getTaskVertexName(), outputContext.getDestinationVertexName(), taskSpec.getTaskAttemptID()); @@ -451,15 +446,18 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { groupInputsMap = new ConcurrentHashMap<String, MergedLogicalInput>(groupInputSpecs.size()); for (GroupInputSpec groupInputSpec : groupInputSpecs) { LOG.info("Initializing GroupInput using GroupInputSpec: " + groupInputSpec); - MergedLogicalInput groupInput = (MergedLogicalInput) createInputFromDescriptor( - groupInputSpec.getMergedInputDescriptor()); - List<Input> inputs = Lists.newArrayListWithCapacity(groupInputSpec.getGroupVertices().size()); - for (String groupVertex : groupInputSpec.getGroupVertices()) { - inputs.add(inputsMap.get(groupVertex)); - } - groupInput.initialize(inputs, new TezMergedInputContextImpl( - groupInputSpec.getMergedInputDescriptor().getUserPayload(), - groupInput, inputReadyTracker, localDirs)); + TezMergedInputContext mergedInputContext = + new TezMergedInputContextImpl(groupInputSpec.getMergedInputDescriptor().getUserPayload(), + groupInputSpec.getGroupName(), groupInputsMap, inputReadyTracker, localDirs); + List<Input> inputs = Lists.newArrayListWithCapacity(groupInputSpec.getGroupVertices().size()); + for (String groupVertex : groupInputSpec.getGroupVertices()) { + inputs.add(inputsMap.get(groupVertex)); + } + + MergedLogicalInput groupInput = + (MergedLogicalInput) createMergedInput(groupInputSpec.getMergedInputDescriptor(), + mergedInputContext, inputs); + groupInputsMap.put(groupInputSpec.getGroupName(), groupInput); } } @@ -475,7 +473,8 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { + processorDescriptor.getClassName()); } - private TezInputContext createInputContext(Input input, InputSpec inputSpec, int inputIndex) { + private TezInputContext createInputContext(Map<String, LogicalInput> inputMap, + InputSpec inputSpec, int inputIndex) { TezInputContext inputContext = new TezInputContextImpl(tezConf, localDirs, appAttemptNumber, tezUmbilical, taskSpec.getDAGName(), taskSpec.getVertexName(), @@ -483,7 +482,7 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { tezCounters, inputIndex, inputSpec.getInputDescriptor().getUserPayload(), this, serviceConsumerMetadata, System.getenv(), initialMemoryDistributor, - inputSpec.getInputDescriptor(), input, inputReadyTracker, objectRegistry); + inputSpec.getInputDescriptor(), inputMap, inputReadyTracker, objectRegistry); return inputContext; } @@ -510,25 +509,41 @@ public class LogicalIOProcessorRuntimeTask extends RuntimeTask { return processorContext; } - private LogicalInput createInput(InputSpec inputSpec) { + private LogicalInput createInput(InputSpec inputSpec, TezInputContext inputContext) { LOG.info("Creating Input"); - return createInputFromDescriptor(inputSpec.getInputDescriptor()); + InputDescriptor inputDesc = inputSpec.getInputDescriptor(); + Input input = ReflectionUtils.createClazzInstance(inputDesc.getClassName(), + new Class[]{TezInputContext.class, Integer.TYPE}, + new Object[]{inputContext, inputSpec.getPhysicalEdgeCount()}); + if (!(input instanceof LogicalInput)) { + throw new TezUncheckedException(inputDesc.getClass().getName() + + " is not a sub-type of LogicalInput." + + " Only LogicalInput sub-types supported by LogicalIOProcessor."); + } + return (LogicalInput) input; } - private LogicalInput createInputFromDescriptor(InputDescriptor inputDesc) { - Input input = ReflectionUtils.createClazzInstance(inputDesc.getClassName()); + private LogicalInput createMergedInput(InputDescriptor inputDesc, + TezMergedInputContext mergedInputContext, + List<Input> constituentInputs) { + LogicalInput input = ReflectionUtils.createClazzInstance(inputDesc.getClassName(), + new Class[]{TezMergedInputContext.class, List.class}, + new Object[]{mergedInputContext, constituentInputs}); if (!(input instanceof LogicalInput)) { throw new TezUncheckedException(inputDesc.getClass().getName() + " is not a sub-type of LogicalInput." + " Only LogicalInput sub-types supported by LogicalIOProcessor."); } - return (LogicalInput)input; + return input; } - private LogicalOutput createOutput(OutputSpec outputSpec) { + private LogicalOutput createOutput(OutputSpec outputSpec, TezOutputContext outputContext) { LOG.info("Creating Output"); - Output output = ReflectionUtils.createClazzInstance(outputSpec - .getOutputDescriptor().getClassName()); + OutputDescriptor outputDesc = outputSpec.getOutputDescriptor(); + Output output = ReflectionUtils.createClazzInstance(outputDesc.getClassName(), + new Class[]{TezOutputContext.class, Integer.TYPE}, + new Object[]{outputContext, outputSpec.getPhysicalEdgeCount()}); + if (!(output instanceof LogicalOutput)) { throw new TezUncheckedException(output.getClass().getName() + " is not a sub-type of LogicalOutput." http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java index e98d694..627e830 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezInputContextImpl.java @@ -40,7 +40,7 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.InputReadyTracker; import org.apache.tez.runtime.RuntimeTask; import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.TezInputContext; import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; import org.apache.tez.runtime.common.objectregistry.ObjectRegistry; @@ -53,24 +53,27 @@ public class TezInputContextImpl extends TezTaskContextImpl private final String sourceVertexName; private final EventMetaData sourceInfo; private final int inputIndex; - private final Input input; + private final Map<String, LogicalInput> inputs; private final InputReadyTracker inputReadyTracker; @Private - public TezInputContextImpl(Configuration conf, String[] workDirs, int appAttemptNumber, - TezUmbilical tezUmbilical, String dagName, String taskVertexName, - String sourceVertexName, TezTaskAttemptID taskAttemptID, - TezCounters counters, int inputIndex, @Nullable byte[] userPayload, - RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata, - Map<String, String> auxServiceEnv, MemoryDistributor memDist, - InputDescriptor inputDescriptor, Input input, InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry) { + public TezInputContextImpl(Configuration conf, String[] workDirs, + int appAttemptNumber, + TezUmbilical tezUmbilical, String dagName, String taskVertexName, + String sourceVertexName, TezTaskAttemptID taskAttemptID, + TezCounters counters, int inputIndex, @Nullable byte[] userPayload, + RuntimeTask runtimeTask, + Map<String, ByteBuffer> serviceConsumerMetadata, + Map<String, String> auxServiceEnv, MemoryDistributor memDist, + InputDescriptor inputDescriptor, Map<String, LogicalInput> inputs, + InputReadyTracker inputReadyTracker, ObjectRegistry objectRegistry) { super(conf, workDirs, appAttemptNumber, dagName, taskVertexName, taskAttemptID, wrapCounters(counters, taskVertexName, sourceVertexName, conf), runtimeTask, tezUmbilical, serviceConsumerMetadata, auxServiceEnv, memDist, inputDescriptor, objectRegistry); checkNotNull(inputIndex, "inputIndex is null"); checkNotNull(sourceVertexName, "sourceVertexName is null"); - checkNotNull(input, "input is null"); + checkNotNull(inputs, "input map is null"); checkNotNull(inputReadyTracker, "inputReadyTracker is null"); this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload); this.inputIndex = inputIndex; @@ -78,7 +81,7 @@ public class TezInputContextImpl extends TezTaskContextImpl this.sourceInfo = new EventMetaData( EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName, taskAttemptID); - this.input = input; + this.inputs = inputs; this.inputReadyTracker = inputReadyTracker; } @@ -126,6 +129,6 @@ public class TezInputContextImpl extends TezTaskContextImpl @Override public void inputIsReady() { - inputReadyTracker.setInputIsReady(input); + inputReadyTracker.setInputIsReady(inputs.get(sourceVertexName)); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java index 8582307..cf55d39 100644 --- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java +++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/api/impl/TezMergedInputContextImpl.java @@ -4,26 +4,34 @@ import static com.google.common.base.Preconditions.checkNotNull; import javax.annotation.Nullable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.tez.common.TezUserPayload; import org.apache.tez.dag.api.DagTypeConverters; import org.apache.tez.runtime.InputReadyTracker; import org.apache.tez.runtime.api.Input; +import org.apache.tez.runtime.api.MergedLogicalInput; import org.apache.tez.runtime.api.TezMergedInputContext; public class TezMergedInputContextImpl implements TezMergedInputContext { private final TezUserPayload userPayload; - private final Input input; + private final String groupInputName; + private final Map<String, MergedLogicalInput> groupInputsMap; private final InputReadyTracker inputReadyTracker; private final String[] workDirs; - public TezMergedInputContextImpl(@Nullable byte[] userPayload, - Input input, InputReadyTracker inputReadyTracker, String[] workDirs) { - checkNotNull(input, "input is null"); + public TezMergedInputContextImpl(@Nullable byte[] userPayload, String groupInputName, + Map<String, MergedLogicalInput> groupInputsMap, + InputReadyTracker inputReadyTracker, String[] workDirs) { + checkNotNull(groupInputName, "groupInputName is null"); + checkNotNull(groupInputsMap, "input-group map is null"); checkNotNull(inputReadyTracker, "inputReadyTracker is null"); + this.groupInputName = groupInputName; + this.groupInputsMap = groupInputsMap; this.userPayload = DagTypeConverters.convertToTezUserPayload(userPayload); - this.input = input; this.inputReadyTracker = inputReadyTracker; this.workDirs = workDirs; } @@ -36,7 +44,7 @@ public class TezMergedInputContextImpl implements TezMergedInputContext { @Override public void inputIsReady() { - inputReadyTracker.setInputIsReady(input); + inputReadyTracker.setInputIsReady(groupInputsMap.get(groupInputName)); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java index 03e597b..352d09c 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestInputReadyTracker.java @@ -23,7 +23,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.tez.runtime.api.AbstractLogicalInput; @@ -31,6 +33,7 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.MergedLogicalInput; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TezMergedInputContext; import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl; import org.junit.Test; @@ -142,8 +145,7 @@ public class TestInputReadyTracker { ImmediatelyReadyInputForTest input3 = new ImmediatelyReadyInputForTest(inputReadyTracker); ControlledReadyInputForTest input4 = new ControlledReadyInputForTest(inputReadyTracker); - AnyOneMergedInputForTest group1 = new AnyOneMergedInputForTest(); - AllMergedInputForTest group2 = new AllMergedInputForTest(); + List<Input> group1Inputs = new ArrayList<Input>(); group1Inputs.add(input1); @@ -153,9 +155,15 @@ public class TestInputReadyTracker { group2Inputs.add(input3); group2Inputs.add(input4); - group1.initialize(group1Inputs, new TezMergedInputContextImpl(null, group1, inputReadyTracker, null)); - group2.initialize(group2Inputs, new TezMergedInputContextImpl(null, group2, inputReadyTracker, null)); - + Map<String, MergedLogicalInput> mergedInputMap = new HashMap<String, MergedLogicalInput>(); + TezMergedInputContext mergedInputContext1 = new TezMergedInputContextImpl(null, "group1", mergedInputMap, inputReadyTracker, null); + TezMergedInputContext mergedInputContext2 = new TezMergedInputContextImpl(null, "group2", mergedInputMap, inputReadyTracker, null); + + AnyOneMergedInputForTest group1 = new AnyOneMergedInputForTest(mergedInputContext1, group1Inputs); + AllMergedInputForTest group2 = new AllMergedInputForTest(mergedInputContext2, group2Inputs); + mergedInputMap.put("group1", group1); + mergedInputMap.put("group2", group2); + // Register groups with tracker List<MergedLogicalInput> groups = Lists.newArrayList(group1, group2); inputReadyTracker.setGroupedInputs(groups); @@ -210,6 +218,7 @@ public class TestInputReadyTracker { private volatile boolean isReady = false; ImmediatelyReadyInputForTest(InputReadyTracker inputReadyTracker) { + super(null, 0); isReady = true; inputReadyTracker.setInputIsReady(this); } @@ -244,6 +253,7 @@ public class TestInputReadyTracker { private InputReadyTracker inputReadyTracker; ControlledReadyInputForTest(InputReadyTracker inputReadyTracker) { + super(null, 0); this.inputReadyTracker = inputReadyTracker; } @@ -280,7 +290,11 @@ public class TestInputReadyTracker { private static class AnyOneMergedInputForTest extends MergedLogicalInput { private volatile boolean isReady = false; - + + public AnyOneMergedInputForTest(TezMergedInputContext context, List<Input> inputs) { + super(context, inputs); + } + @Override public Reader getReader() throws Exception { return null; @@ -297,7 +311,11 @@ public class TestInputReadyTracker { private volatile boolean isReady = false; private Set<Input> readyInputs = Sets.newHashSet(); - + + public AllMergedInputForTest(TezMergedInputContext context, List<Input> inputs) { + super(context, inputs); + } + @Override public Reader getReader() throws Exception { return null; http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java index a2876a7..dcf3303 100644 --- a/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java +++ b/tez-runtime-internals/src/test/java/org/apache/tez/runtime/TestLogicalIOProcessorRuntimeTask.java @@ -41,6 +41,8 @@ import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TezInputContext; +import org.apache.tez.runtime.api.TezOutputContext; import org.apache.tez.runtime.api.Writer; import org.apache.tez.runtime.api.impl.InputSpec; import org.apache.tez.runtime.api.impl.OutputSpec; @@ -168,7 +170,8 @@ public class TestLogicalIOProcessorRuntimeTask { public static volatile int startCount = 0; - public TestInput() { + public TestInput(TezInputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); } @Override @@ -204,9 +207,11 @@ public class TestLogicalIOProcessorRuntimeTask { public static volatile int startCount = 0; - public TestOutput() { + public TestOutput(TezOutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); } + @Override public List<Event> initialize() throws Exception { getContext().requestInitialMemory(0, null); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java index e5439de..c19f3a6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java @@ -42,13 +42,16 @@ public class InputReadyVertexManager extends VertexManagerPlugin { private static final Log LOG = LogFactory.getLog(InputReadyVertexManager.class); - VertexManagerPluginContext context; Map<String, SourceVertexInfo> srcVertexInfo = Maps.newHashMap(); boolean taskIsStarted[]; int oneToOneSrcTasksDoneCount[]; Container oneToOneLocationHints[]; int numOneToOneEdges; - + + public InputReadyVertexManager(VertexManagerPluginContext context) { + super(context); + } + class SourceVertexInfo { EdgeProperty edgeProperty; int numTasks; @@ -64,25 +67,24 @@ public class InputReadyVertexManager extends VertexManagerPlugin { } @Override - public void initialize(VertexManagerPluginContext context) { - this.context = context; + public void initialize() { } @Override public void onVertexStarted(Map<String, List<Integer>> completions) { - int numManagedTasks = context.getVertexNumTasks(context.getVertexName()); - LOG.info("Managing " + numManagedTasks + " tasks for vertex: " + context.getVertexName()); + int numManagedTasks = getContext().getVertexNumTasks(getContext().getVertexName()); + LOG.info("Managing " + numManagedTasks + " tasks for vertex: " + getContext().getVertexName()); taskIsStarted = new boolean[numManagedTasks]; // find out about all input edge types. If there is a custom edge then // TODO Until TEZ-1013 we cannot handle custom input formats - Map<String, EdgeProperty> edges = context.getInputVertexEdgeProperties(); + Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties(); int oneToOneSrcTaskCount = 0; numOneToOneEdges = 0; for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) { EdgeProperty edgeProp = entry.getValue(); String srcVertex = entry.getKey(); - int numSrcTasks = context.getVertexNumTasks(srcVertex); + int numSrcTasks = getContext().getVertexNumTasks(srcVertex); switch (edgeProp.getDataMovementType()) { case CUSTOM: throw new TezUncheckedException("Cannot handle custom edge"); @@ -145,7 +147,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin { oneToOneSrcTasksDoneCount[taskId.intValue()]++; // keep the latest container that completed as the location hint // After there is standard data size info available then use it - oneToOneLocationHints[taskId.intValue()] = context.getTaskContainer(vertex, taskId); + oneToOneLocationHints[taskId.intValue()] = getContext().getTaskContainer(vertex, taskId); } } @@ -174,7 +176,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin { if (numOneToOneEdges == 0) { // no 1-1 dependency. Start all tasks int numTasks = taskIsStarted.length; - LOG.info("Starting all " + numTasks + "tasks for vertex: " + context.getVertexName()); + LOG.info("Starting all " + numTasks + "tasks for vertex: " + getContext().getVertexName()); tasksToStart = Lists.newArrayListWithCapacity(numTasks); for (int i=0; i<numTasks; ++i) { taskIsStarted[i] = true; @@ -191,7 +193,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin { locationHint = new TaskLocationHint(oneToOneLocationHints[i].getId()); } LOG.info("Starting task " + i + " for vertex: " - + context.getVertexName() + " with location: " + + getContext().getVertexName() + " with location: " + ((locationHint != null) ? locationHint.getAffinitizedContainer() : "null")); tasksToStart.add(new TaskWithLocationHint(new Integer(i), locationHint)); } @@ -199,7 +201,7 @@ public class InputReadyVertexManager extends VertexManagerPlugin { } if (tasksToStart != null && !tasksToStart.isEmpty()) { - context.scheduleVertexTasks(tasksToStart); + getContext().scheduleVertexTasks(tasksToStart); } } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java index 2dc42d0..83c96e6 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/ShuffleVertexManager.java @@ -105,7 +105,6 @@ public class ShuffleVertexManager extends VertexManagerPlugin { private static final Log LOG = LogFactory.getLog(ShuffleVertexManager.class); - VertexManagerPluginContext context; float slowStartMinSrcCompletionFraction; float slowStartMaxSrcCompletionFraction; long desiredTaskInputDataSize = 1024*1024*100L; @@ -121,11 +120,11 @@ public class ShuffleVertexManager extends VertexManagerPlugin { Map<String, Set<Integer>> bipartiteSources = Maps.newHashMap(); long completedSourceTasksOutputSize = 0; - - public ShuffleVertexManager() { + + public ShuffleVertexManager(VertexManagerPluginContext context) { + super(context); } - - + public static class CustomShuffleEdgeManager extends EdgeManager { int numSourceTaskOutputs; int numDestinationTasks; @@ -133,13 +132,14 @@ public class ShuffleVertexManager extends VertexManagerPlugin { int remainderRangeForLastShuffler; int numSourceTasks; - public CustomShuffleEdgeManager() { + public CustomShuffleEdgeManager(EdgeManagerContext context) { + super(context); } @Override - public void initialize(EdgeManagerContext edgeManagerContext) { + public void initialize() { // Nothing to do. This class isn't currently designed to be used at the DAG API level. - byte[] userPayload = edgeManagerContext.getUserPayload(); + byte[] userPayload = getContext().getUserPayload(); if (userPayload == null || userPayload.length == 0) { throw new RuntimeException("Could not initialize CustomShuffleEdgeManager" @@ -298,12 +298,12 @@ public class ShuffleVertexManager extends VertexManagerPlugin { @Override public void onVertexStarted(Map<String, List<Integer>> completions) { pendingTasks = Lists.newArrayListWithCapacity( - context.getVertexNumTasks(context.getVertexName())); + getContext().getVertexNumTasks(getContext().getVertexName())); // track the tasks in this vertex updatePendingTasks(); updateSourceTaskCount(); - LOG.info("OnVertexStarted vertex: " + context.getVertexName() + + LOG.info("OnVertexStarted vertex: " + getContext().getVertexName() + " with " + totalNumSourceTasks + " source tasks and " + totalTasksToSchedule + " pending tasks"); @@ -357,7 +357,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { void updatePendingTasks() { pendingTasks.clear(); - for (int i=0; i<context.getVertexNumTasks(context.getVertexName()); ++i) { + for (int i=0; i<getContext().getVertexNumTasks(getContext().getVertexName()); ++i) { pendingTasks.add(new Integer(i)); } totalTasksToSchedule = pendingTasks.size(); @@ -367,7 +367,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { // track source vertices int numSrcTasks = 0; for(String vertex : bipartiteSources.keySet()) { - numSrcTasks += context.getVertexNumTasks(vertex); + numSrcTasks += getContext().getVertexNumTasks(vertex); } totalNumSourceTasks = numSrcTasks; } @@ -410,7 +410,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { int finalTaskParallelism = (remainderRangeForLastShuffler > 0) ? (numShufflersWithBaseRange + 1) : (numShufflersWithBaseRange); - LOG.info("Reduce auto parallelism for vertex: " + context.getVertexName() + LOG.info("Reduce auto parallelism for vertex: " + getContext().getVertexName() + " to " + finalTaskParallelism + " from " + pendingTasks.size() + " . Expected output: " + expectedTotalSourceTasksOutputSize + " based on actual output: " + completedSourceTasksOutputSize @@ -427,7 +427,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { CustomShuffleEdgeManagerConfig edgeManagerConfig = new CustomShuffleEdgeManagerConfig( currentParallelism, finalTaskParallelism, - context.getVertexNumTasks(vertex), basePartitionRange, + getContext().getVertexNumTasks(vertex), basePartitionRange, ((remainderRangeForLastShuffler > 0) ? remainderRangeForLastShuffler : basePartitionRange)); EdgeManagerDescriptor edgeManagerDescriptor = @@ -436,7 +436,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { edgeManagers.put(vertex, edgeManagerDescriptor); } - context.setVertexParallelism(finalTaskParallelism, null, edgeManagers, null); + getContext().setVertexParallelism(finalTaskParallelism, null, edgeManagers, null); updatePendingTasks(); } } @@ -460,7 +460,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { scheduledTasks.add(new TaskWithLocationHint(pendingTasks.get(0), null)); pendingTasks.remove(0); } - context.scheduleVertexTasks(scheduledTasks); + getContext().scheduleVertexTasks(scheduledTasks); } void schedulePendingTasks() { @@ -472,7 +472,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { if (numSourceTasksCompleted == totalNumSourceTasks && numPendingTasks > 0) { LOG.info("All source tasks assigned. " + "Ramping up " + numPendingTasks + - " remaining tasks for vertex: " + context.getVertexName()); + " remaining tasks for vertex: " + getContext().getVertexName()); schedulePendingTasks(numPendingTasks); return; } @@ -515,7 +515,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { // numTasksToSchedule can be -ve if numSourceTasksCompleted does not // does not increase monotonically LOG.info("Scheduling " + numTasksToSchedule + " tasks for vertex: " + - context.getVertexName() + " with totalTasks: " + + getContext().getVertexName() + " with totalTasks: " + totalTasksToSchedule + ". " + numSourceTasksCompleted + " source tasks completed out of " + totalNumSourceTasks + ". SourceTaskCompletedFraction: " + completedSourceTaskFraction + @@ -526,15 +526,13 @@ public class ShuffleVertexManager extends VertexManagerPlugin { } @Override - public void initialize(VertexManagerPluginContext context) { + public void initialize() { Configuration conf; try { - conf = TezUtils.createConfFromUserPayload(context.getUserPayload()); + conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); } catch (IOException e) { throw new TezUncheckedException(e); } - - this.context = context; this.slowStartMinSrcCompletionFraction = conf .getFloat( @@ -571,7 +569,7 @@ public class ShuffleVertexManager extends VertexManagerPlugin { + " desiredTaskIput:" + desiredTaskInputDataSize + " minTasks:" + minTaskParallelism); - Map<String, EdgeProperty> inputs = context.getInputVertexEdgeProperties(); + Map<String, EdgeProperty> inputs = getContext().getInputVertexEdgeProperties(); for(Map.Entry<String, EdgeProperty> entry : inputs.entrySet()) { if (entry.getValue().getDataMovementType() == DataMovementType.SCATTER_GATHER) { String vertex = entry.getKey(); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java index f142ee9..5c2e82f 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java @@ -19,15 +19,22 @@ package org.apache.tez.runtime.library.input; import java.io.IOException; +import java.util.List; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.MergedLogicalInput; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TezMergedInputContext; import org.apache.tez.runtime.library.api.KeyValueReader; public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput { + public ConcatenatedMergedKeyValueInput(TezMergedInputContext context, + List<Input> inputs) { + super(context, inputs); + } + public class ConcatenatedMergedKeyValueReader implements KeyValueReader { private int currentReaderIndex = 0; private KeyValueReader currentReader; http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java index 8affa14..3c56d07 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java @@ -19,15 +19,22 @@ package org.apache.tez.runtime.library.input; import java.io.IOException; +import java.util.List; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.MergedLogicalInput; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TezMergedInputContext; import org.apache.tez.runtime.library.api.KeyValuesReader; public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput { + public ConcatenatedMergedKeyValuesInput(TezMergedInputContext context, + List<Input> inputs) { + super(context, inputs); + } + public class ConcatenatedMergedKeyValuesReader implements KeyValuesReader { private int currentReaderIndex = 0; private KeyValuesReader currentReader; http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java index 93ca93e..e1f825e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/LocalMergedInput.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.tez.common.TezUtils; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.TezInputContext; import org.apache.tez.runtime.library.common.localshuffle.LocalShuffle; /** @@ -32,6 +33,10 @@ import org.apache.tez.runtime.library.common.localshuffle.LocalShuffle; */ public class LocalMergedInput extends ShuffledMergedInputLegacy { + public LocalMergedInput(TezInputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); + } + @Override public List<Event> initialize() throws IOException { getContext().requestInitialMemory(0l, null); // mandatory call. http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java index 6c50b93..01b9de7 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInput.java @@ -39,6 +39,7 @@ import org.apache.tez.common.counters.TezCounter; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TezInputContext; import org.apache.tez.runtime.library.api.KeyValuesReader; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; @@ -79,6 +80,10 @@ public class ShuffledMergedInput extends AbstractLogicalInput { private final AtomicBoolean isStarted = new AtomicBoolean(false); + public ShuffledMergedInput(TezInputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); + } + @Override public synchronized List<Event> initialize() throws IOException { this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java index 2633968..612bab5 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledMergedInputLegacy.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.util.Progress; +import org.apache.tez.runtime.api.TezInputContext; import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; @LimitedPrivate("mapreduce") @@ -31,6 +32,10 @@ public class ShuffledMergedInputLegacy extends ShuffledMergedInput { private final Progress progress = new Progress(); + public ShuffledMergedInputLegacy(TezInputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); + } + @Private public TezRawKeyValueIterator getIterator() throws IOException, InterruptedException { // wait for input so that iterator is available http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java index e2d73e6..ff076ca 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ShuffledUnorderedKVInput.java @@ -38,6 +38,7 @@ import org.apache.tez.common.counters.TezCounter; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TezInputContext; import org.apache.tez.runtime.library.api.KeyValueReader; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.ConfigUtils; @@ -68,7 +69,8 @@ public class ShuffledUnorderedKVInput extends AbstractLogicalInput { private SimpleFetchedInputAllocator inputManager; private ShuffleEventHandler inputEventHandler; - public ShuffledUnorderedKVInput() { + public ShuffledUnorderedKVInput(TezInputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java index 66273d2..197664e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/SortedGroupedMergedInput.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.RawComparator; import org.apache.tez.runtime.api.Input; import org.apache.tez.runtime.api.MergedLogicalInput; +import org.apache.tez.runtime.api.TezMergedInputContext; import org.apache.tez.runtime.library.api.KeyValuesReader; /** @@ -48,6 +49,10 @@ public class SortedGroupedMergedInput extends MergedLogicalInput { private final Set<Input> completedInputs = Collections .newSetFromMap(new IdentityHashMap<Input, Boolean>()); + public SortedGroupedMergedInput(TezMergedInputContext context, List<Input> inputs) { + super(context, inputs); + } + @Override public KeyValuesReader getReader() throws Exception { return new SortedGroupedMergedKeyValuesReader(getInputs()); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java index d1c5fc0..550448e 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/LocalOnFileSorterOutput.java @@ -26,12 +26,16 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TezOutputContext; import org.apache.tez.runtime.library.common.task.local.output.TezTaskOutput; public class LocalOnFileSorterOutput extends OnFileSortedOutput { private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class); + public LocalOnFileSorterOutput(TezOutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java index f10cb20..8536746 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileSortedOutput.java @@ -43,9 +43,9 @@ import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.runtime.api.AbstractLogicalOutput; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TezOutputContext; import org.apache.tez.runtime.api.events.CompositeDataMovementEvent; import org.apache.tez.runtime.api.events.VertexManagerEvent; -import org.apache.tez.runtime.library.api.KeyValueWriter; import org.apache.tez.runtime.library.api.KeyValuesWriter; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler; @@ -74,6 +74,10 @@ public class OnFileSortedOutput extends AbstractLogicalOutput { private boolean sendEmptyPartitionDetails; private final AtomicBoolean isStarted = new AtomicBoolean(false); + public OnFileSortedOutput(TezOutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + } + @Override public synchronized List<Event> initialize() throws IOException { this.startTime = System.nanoTime(); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java index e9956e4..32592bb 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedKVOutput.java @@ -38,6 +38,7 @@ import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezUncheckedException; import org.apache.tez.runtime.api.AbstractLogicalOutput; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TezOutputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.library.api.KeyValuesWriter; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; @@ -47,7 +48,6 @@ import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataMovem import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads.DataProto; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.protobuf.ByteString; @@ -62,9 +62,11 @@ public class OnFileUnorderedKVOutput extends AbstractLogicalOutput { private boolean dataViaEventsEnabled; private int dataViaEventsMaxSize; - public OnFileUnorderedKVOutput() { + public OnFileUnorderedKVOutput(TezOutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); } + @Override public synchronized List<Event> initialize() throws Exception { @@ -153,12 +155,6 @@ public class OnFileUnorderedKVOutput extends AbstractLogicalOutput { return events; } - @Override - public synchronized void setNumPhysicalOutputs(int numOutputs) { - Preconditions.checkArgument(numOutputs == 1, - "Number of outputs can only be 1 for " + this.getClass().getName()); - } - @VisibleForTesting @Private String getHost() { http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java index f666924..75d6e00 100644 --- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java +++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OnFileUnorderedPartitionedKVOutput.java @@ -51,37 +51,33 @@ public class OnFileUnorderedPartitionedKVOutput extends AbstractLogicalOutput { private static final Log LOG = LogFactory.getLog(OnFileUnorderedPartitionedKVOutput.class); - private TezOutputContext outputContext; private Configuration conf; - private int numPhysicalOutputs; private MemoryUpdateCallbackHandler memoryUpdateCallbackHandler; private UnorderedPartitionedKVWriter kvWriter; private final AtomicBoolean isStarted = new AtomicBoolean(false); + public OnFileUnorderedPartitionedKVOutput(TezOutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + } + @Override - public synchronized List<Event> initialize(TezOutputContext outputContext) throws Exception { - this.outputContext = outputContext; - this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload()); - this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, outputContext.getWorkDirs()); + public synchronized List<Event> initialize() throws Exception { + this.conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload()); + this.conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, getContext().getWorkDirs()); this.conf.setInt(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_NUM_EXPECTED_PARTITIONS, - this.numPhysicalOutputs); + getNumPhysicalOutputs()); this.memoryUpdateCallbackHandler = new MemoryUpdateCallbackHandler(); - outputContext.requestInitialMemory( + getContext().requestInitialMemory( UnorderedPartitionedKVWriter.getInitialMemoryRequirement(conf, - outputContext.getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler); + getContext().getTotalMemoryAvailableToTask()), memoryUpdateCallbackHandler); return Collections.emptyList(); } @Override - public List<Event> initialize() throws Exception { - return null; - } - - @Override public synchronized void start() throws Exception { if (!isStarted.get()) { memoryUpdateCallbackHandler.validateUpdateReceived(); - this.kvWriter = new UnorderedPartitionedKVWriter(outputContext, conf, numPhysicalOutputs, + this.kvWriter = new UnorderedPartitionedKVWriter(getContext(), conf, getNumPhysicalOutputs(), memoryUpdateCallbackHandler.getMemoryAssigned()); isStarted.set(true); } @@ -106,11 +102,6 @@ public class OnFileUnorderedPartitionedKVOutput extends AbstractLogicalOutput { } } - @Override - public synchronized void setNumPhysicalOutputs(int numOutputs) { - this.numPhysicalOutputs = numOutputs; - } - private static final Set<String> confKeys = new HashSet<String>(); static { http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java index 65e63d6..09b7d4b 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestInputReadyVertexManager.java @@ -78,8 +78,8 @@ public class TestInputReadyVertexManager { Map<String, List<Integer>> initialCompletions = Maps.newHashMap(); initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0)); - InputReadyVertexManager manager = new InputReadyVertexManager(); - manager.initialize(mockContext); + InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); + manager.initialize(); manager.onVertexStarted(initialCompletions); manager.onSourceTaskCompleted(mockSrcVertexId1, 1); verify(mockContext, times(0)).scheduleVertexTasks(anyList()); @@ -121,8 +121,8 @@ public class TestInputReadyVertexManager { Map<String, List<Integer>> initialCompletions = Maps.newHashMap(); initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0)); - InputReadyVertexManager manager = new InputReadyVertexManager(); - manager.initialize(mockContext); + InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); + manager.initialize(); manager.onVertexStarted(initialCompletions); verify(mockContext, times(1)).scheduleVertexTasks(requestCaptor.capture()); Assert.assertEquals(1, requestCaptor.getValue().size()); @@ -196,9 +196,9 @@ public class TestInputReadyVertexManager { Map<String, List<Integer>> initialCompletions = Maps.newHashMap(); // 1-1 sources do not match managed tasks - InputReadyVertexManager manager = new InputReadyVertexManager(); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(4); - manager.initialize(mockContext); + InputReadyVertexManager manager = new InputReadyVertexManager(mockContext); + manager.initialize(); try { manager.onVertexStarted(initialCompletions); Assert.assertTrue("Should have exception", false); @@ -207,10 +207,10 @@ public class TestInputReadyVertexManager { } // 1-1 sources do not match - manager = new InputReadyVertexManager(); when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3); when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(4); - manager.initialize(mockContext); + manager = new InputReadyVertexManager(mockContext); + manager.initialize(); try { manager.onVertexStarted(initialCompletions); Assert.assertTrue("Should have exception", false); @@ -221,8 +221,8 @@ public class TestInputReadyVertexManager { initialCompletions.put(mockSrcVertexId1, Collections.singletonList(0)); initialCompletions.put(mockSrcVertexId2, Collections.singletonList(0)); when(mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn(3); - manager = new InputReadyVertexManager(); - manager.initialize(mockContext); + manager = new InputReadyVertexManager(mockContext); + manager.initialize(); manager.onVertexStarted(initialCompletions); // all 1-1 0's done but not scheduled because v1 is not done manager.onSourceTaskCompleted(mockSrcVertexId3, 0); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java index 5c3a83a..caed74a 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/dag/library/vertexmanager/TestShuffleVertexManager.java @@ -125,10 +125,10 @@ public class TestShuffleVertexManager { newEdgeManagers.clear(); for (Entry<String, EdgeManagerDescriptor> entry : ((Map<String, EdgeManagerDescriptor>)invocation.getArguments()[2]).entrySet()) { - EdgeManager edgeManager = ReflectionUtils.createClazzInstance( - entry.getValue().getClassName()); + + final byte[] userPayload = entry.getValue().getUserPayload(); - edgeManager.initialize(new EdgeManagerContext() { + EdgeManagerContext emContext = new EdgeManagerContext() { @Override public byte[] getUserPayload() { return userPayload; @@ -153,7 +153,11 @@ public class TestShuffleVertexManager { public int getDestinationVertexNumTasks() { return 0; } - }); + }; + EdgeManager edgeManager = ReflectionUtils + .createClazzInstance(entry.getValue().getClassName(), + new Class[]{EdgeManagerContext.class}, new Object[]{emContext}); + edgeManager.initialize(); newEdgeManagers.put(entry.getKey(), edgeManager); } return null; @@ -486,8 +490,7 @@ public class TestShuffleVertexManager { private ShuffleVertexManager createManager(Configuration conf, VertexManagerPluginContext context, float min, float max) { conf.setFloat(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, min); - conf.setFloat(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max); - ShuffleVertexManager manager = new ShuffleVertexManager(); + conf.setFloat(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, max); byte[] payload; try { payload = TezUtils.createUserPayloadFromConf(conf); @@ -495,7 +498,8 @@ public class TestShuffleVertexManager { throw new RuntimeException(e); } when(context.getUserPayload()).thenReturn(payload); - manager.initialize(context); + ShuffleVertexManager manager = new ShuffleVertexManager(context); + manager.initialize(); return manager; } } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java index d8ebcb6..890b342 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestSortedGroupedMergedInput.java @@ -24,15 +24,17 @@ import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import java.io.IOException; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import org.apache.hadoop.io.RawComparator; import org.apache.tez.runtime.InputReadyTracker; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Input; -import org.apache.tez.runtime.api.TezInputContext; +import org.apache.tez.runtime.api.MergedLogicalInput; import org.apache.tez.runtime.api.TezMergedInputContext; import org.apache.tez.runtime.api.impl.TezMergedInputContextImpl; import org.apache.tez.runtime.library.api.KeyValuesReader; @@ -40,14 +42,13 @@ import org.junit.Test; public class TestSortedGroupedMergedInput { - TezMergedInputContext createMergedInputContext(Input input) { - return new TezMergedInputContextImpl(null, input, mock(InputReadyTracker.class), null); + TezMergedInputContext createMergedInputContext() { + return new TezMergedInputContextImpl(null, "mergedInputName", new HashMap<String, MergedLogicalInput>(), + mock(InputReadyTracker.class), null); } @Test public void testSimple() throws Exception { - SortedGroupedMergedInput input = new SortedGroupedMergedInput(); - SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 }, new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } }); @@ -65,8 +66,8 @@ public class TestSortedGroupedMergedInput { sInputs.add(sInput1); sInputs.add(sInput2); sInputs.add(sInput3); + SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs); - input.initialize(sInputs, createMergedInputContext(input)); KeyValuesReader kvsReader = input.getReader(); int keyCount = 0; while (kvsReader.next()) { @@ -86,7 +87,7 @@ public class TestSortedGroupedMergedInput { @Test public void testSkippedKey() throws Exception { - SortedGroupedMergedInput input = new SortedGroupedMergedInput(); + SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 }, new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } }); @@ -106,7 +107,8 @@ public class TestSortedGroupedMergedInput { sInputs.add(sInput2); sInputs.add(sInput3); - input.initialize(sInputs, createMergedInputContext(input)); + SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs); + KeyValuesReader kvsReader = input.getReader(); int keyCount = 0; while (kvsReader.next()) { @@ -129,7 +131,6 @@ public class TestSortedGroupedMergedInput { @Test public void testPartialValuesSkip() throws Exception { - SortedGroupedMergedInput input = new SortedGroupedMergedInput(); SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3 }, new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 } }); @@ -149,7 +150,7 @@ public class TestSortedGroupedMergedInput { sInputs.add(sInput2); sInputs.add(sInput3); - input.initialize(sInputs, createMergedInputContext(input)); + SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs); KeyValuesReader kvsReader = input.getReader(); int keyCount = 0; while (kvsReader.next()) { @@ -176,7 +177,6 @@ public class TestSortedGroupedMergedInput { @Test public void testOrdering() throws Exception { - SortedGroupedMergedInput input = new SortedGroupedMergedInput(); SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 2, 4 }, new int[][] { { 2, 2 }, { 4, 4 } }); @@ -196,7 +196,7 @@ public class TestSortedGroupedMergedInput { sInputs.add(sInput2); sInputs.add(sInput3); - input.initialize(sInputs, createMergedInputContext(input)); + SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs); KeyValuesReader kvsReader = input.getReader(); int keyCount = 0; while (kvsReader.next()) { @@ -228,7 +228,6 @@ public class TestSortedGroupedMergedInput { @Test public void testSkippedKey2() throws Exception { - SortedGroupedMergedInput input = new SortedGroupedMergedInput(); SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 2, 4 }, new int[][] { { 2, 2 }, { 4, 4 } }); @@ -248,7 +247,7 @@ public class TestSortedGroupedMergedInput { sInputs.add(sInput2); sInputs.add(sInput3); - input.initialize(sInputs, createMergedInputContext(input)); + SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs); KeyValuesReader kvsReader = input.getReader(); int keyCount = 0; while (kvsReader.next()) { @@ -284,7 +283,6 @@ public class TestSortedGroupedMergedInput { // Reads all values for a key, but doesn't trigger the last hasNext() call. @Test public void testSkippedKey3() throws Exception { - SortedGroupedMergedInput input = new SortedGroupedMergedInput(); SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] { 1, 2, 3, 4 }, new int[][] { { 1, 1 }, { 2, 2 }, { 3, 3 }, {4, 4} }); @@ -304,7 +302,8 @@ public class TestSortedGroupedMergedInput { sInputs.add(sInput2); sInputs.add(sInput3); - input.initialize(sInputs, createMergedInputContext(input)); + SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs); + KeyValuesReader kvsReader = input.getReader(); int keyCount = 0; while (kvsReader.next()) { @@ -330,7 +329,6 @@ public class TestSortedGroupedMergedInput { @Test public void testEmptySources() throws Exception { - SortedGroupedMergedInput input = new SortedGroupedMergedInput(); SortedTestKeyValuesReader kvsReader1 = new SortedTestKeyValuesReader(new int[] {}, new int[][] {}); @@ -350,7 +348,8 @@ public class TestSortedGroupedMergedInput { sInputs.add(sInput2); sInputs.add(sInput3); - input.initialize(sInputs, createMergedInputContext(input)); + SortedGroupedMergedInput input = new SortedGroupedMergedInput(createMergedInputContext(), sInputs); + KeyValuesReader kvsReader = input.getReader(); assertFalse(kvsReader.next()); } @@ -360,12 +359,12 @@ public class TestSortedGroupedMergedInput { final SortedTestKeyValuesReader reader; SortedTestInput(SortedTestKeyValuesReader reader) { + super(null, 0); this.reader = reader; } @Override - public List<Event> initialize(TezInputContext inputContext) throws Exception { - inputContext.inputIsReady(); + public List<Event> initialize() throws IOException { return null; } @@ -387,10 +386,6 @@ public class TestSortedGroupedMergedInput { return null; } - @Override - public void setNumPhysicalInputs(int numInputs) { - } - @SuppressWarnings("rawtypes") public RawComparator getInputKeyComparator() { return new RawComparatorForTest(); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java index 3eb92c9..51c6a25 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileSortedOutput.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.tez.runtime.library.output; import org.apache.hadoop.conf.Configuration; @@ -45,23 +63,6 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ @RunWith(Parameterized.class) public class TestOnFileSortedOutput { @@ -147,10 +148,9 @@ public class TestOnFileSortedOutput { } private void startSortedOutput(int partitions) throws Exception { - sortedOutput = new OnFileSortedOutput(); - sortedOutput.setNumPhysicalOutputs(partitions); TezOutputContext context = createTezOutputContext(); - sortedOutput.initialize(context); + sortedOutput = new OnFileSortedOutput(context, partitions); + sortedOutput.initialize(); sortedOutput.start(); writer = sortedOutput.getWriter(); } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java ---------------------------------------------------------------------- diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java index fbe3b03..ac91e44 100644 --- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java +++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/output/TestOnFileUnorderedKVOutput.java @@ -91,9 +91,6 @@ public class TestOnFileUnorderedKVOutput { @Test public void testGeneratedDataMovementEvent() throws Exception { - - OnFileUnorderedKVOutput kvOutput = new OnFileUnorderedKVOutputForTest(); - Configuration conf = new Configuration(); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); @@ -127,9 +124,11 @@ public class TestOnFileUnorderedKVOutput { taskAttemptID, counters, 0, userPayload, runtimeTask, null, auxEnv, new MemoryDistributor(1, 1, conf) , outputDescriptor, null); + OnFileUnorderedKVOutput kvOutput = new OnFileUnorderedKVOutputForTest(outputContext, 1); + List<Event> events = null; - events = kvOutput.initialize(outputContext); + events = kvOutput.initialize(); assertTrue(events != null && events.size() == 0); KeyValueWriter kvWriter = kvOutput.getWriter(); @@ -154,6 +153,11 @@ public class TestOnFileUnorderedKVOutput { } private static class OnFileUnorderedKVOutputForTest extends OnFileUnorderedKVOutput { + + public OnFileUnorderedKVOutputForTest(TezOutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + } + @Override String getHost() { return "host"; http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java index 67fc6a5..b0d5061 100644 --- a/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java +++ b/tez-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java @@ -667,9 +667,16 @@ public class TestMRRJobsDAGApi { // the path it writes to is not dynamic. private static String RELOCALIZATION_TEST_CLASS_NAME = "AMClassloadTestDummyClass"; public static class MRInputAMSplitGeneratorRelocalizationTest extends MRInputAMSplitGenerator { - public List<Event> initialize(TezRootInputInitializerContext rootInputContext) throws Exception { + + public MRInputAMSplitGeneratorRelocalizationTest( + TezRootInputInitializerContext initializerContext) { + super(initializerContext); + } + + @Override + public List<Event> initialize() throws Exception { MRInputUserPayloadProto userPayloadProto = MRHelpers - .parseMRInputPayload(rootInputContext.getInputUserPayload()); + .parseMRInputPayload(getContext().getInputUserPayload()); Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto .getConfigurationBytes()); @@ -682,7 +689,7 @@ public class TestMRRJobsDAGApi { LOG.info("Class not found"); } - return super.initialize(rootInputContext); + return super.initialize(); } } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-tests/src/test/java/org/apache/tez/test/TestInput.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java index e8c8ed1..51d24bc 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestInput.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestInput.java @@ -32,6 +32,7 @@ import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.runtime.api.AbstractLogicalInput; import org.apache.tez.runtime.api.Event; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TezInputContext; import org.apache.tez.runtime.api.events.DataMovementEvent; import org.apache.tez.runtime.api.events.InputFailedEvent; import org.apache.tez.runtime.api.events.InputReadErrorEvent; @@ -108,7 +109,17 @@ public class TestInput extends AbstractLogicalInput { */ public static String TEZ_FAILING_INPUT_FAILING_TASK_ATTEMPT = "tez.failing-input.failing-task-attempt"; - + + public TestInput(TezInputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); + this.completedInputVersion = new int[numPhysicalInputs]; + this.inputValues = new int[numPhysicalInputs]; + for (int i=0; i<numPhysicalInputs; ++i) { + this.completedInputVersion[i] = -1; + this.inputValues[i] = -1; + } + } + public static InputDescriptor getInputDesc(byte[] payload) { return new InputDescriptor(TestInput.class.getName()). setUserPayload(payload); @@ -140,7 +151,7 @@ public class TestInput extends AbstractLogicalInput { (lastInputReadyValue <= failingInputUpto)) { List<Event> events = Lists.newLinkedList(); if (failingInputIndices.contains(failAll)) { - for (int i=0; i<numPhysicalInputs; ++i) { + for (int i=0; i<getNumPhysicalInputs(); ++i) { String msg = ("FailingInput: " + getContext().getUniqueIdentifier() + " index: " + i + " version: " + lastInputReadyValue); events.add(new InputReadErrorEvent(msg, i, lastInputReadyValue)); @@ -148,9 +159,9 @@ public class TestInput extends AbstractLogicalInput { } } else { for (Integer index : failingInputIndices) { - if (index.intValue() >= numPhysicalInputs) { + if (index.intValue() >= getNumPhysicalInputs()) { throwException("InputIndex: " + index.intValue() + - " should be less than numInputs: " + numPhysicalInputs); + " should be less than numInputs: " + getNumPhysicalInputs()); } if (completedInputVersion[index.intValue()] < lastInputReadyValue) { continue; // dont fail a previous version now. @@ -196,7 +207,7 @@ public class TestInput extends AbstractLogicalInput { // sum input value given by upstream tasks int sum = 0; - for (int i=0; i<numPhysicalInputs; ++i) { + for (int i=0; i<getNumPhysicalInputs(); ++i) { if (inputValues[i] == -1) { throwException("Invalid input value : " + i); } @@ -215,7 +226,7 @@ public class TestInput extends AbstractLogicalInput { public static String getVertexConfName(String confName, String vertexName) { return confName + "." + vertexName; } - + @Override public List<Event> initialize() throws Exception { getContext().requestInitialMemory(0l, null); //Mandatory call. @@ -272,7 +283,7 @@ public class TestInput extends AbstractLogicalInput { LOG.info("Received DataMovement event sourceId : " + dmEvent.getSourceIndex() + " targetId: " + dmEvent.getTargetIndex() + " version: " + dmEvent.getVersion() + - " numInputs: " + numPhysicalInputs + + " numInputs: " + getNumPhysicalInputs() + " numCompletedInputs: " + numCompletedInputs); this.completedInputVersion[dmEvent.getTargetIndex()] = dmEvent.getVersion(); this.inputValues[dmEvent.getTargetIndex()] = @@ -282,13 +293,13 @@ public class TestInput extends AbstractLogicalInput { numCompletedInputs--; LOG.info("Received InputFailed event targetId: " + ifEvent.getTargetIndex() + " version: " + ifEvent.getVersion() + - " numInputs: " + numPhysicalInputs + + " numInputs: " + getNumPhysicalInputs() + " numCompletedInputs: " + numCompletedInputs); } } - if (numCompletedInputs == numPhysicalInputs) { + if (numCompletedInputs == getNumPhysicalInputs()) { int maxInputVersionSeen = -1; - for (int i=0; i<numPhysicalInputs; ++i) { + for (int i=0; i<getNumPhysicalInputs(); ++i) { if (completedInputVersion[i] < 0) { LOG.info("Not received completion for input " + i); return; @@ -310,14 +321,4 @@ public class TestInput extends AbstractLogicalInput { return null; } - @Override - public void setNumPhysicalInputs(int numInputs) { - this.numPhysicalInputs = numInputs; - this.completedInputVersion = new int[numInputs]; - this.inputValues = new int[numInputs]; - for (int i=0; i<numInputs; ++i) { - this.completedInputVersion[i] = -1; - this.inputValues[i] = -1; - } - } } http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java index 0ab1070..064119f 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestOutput.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.tez.dag.api.OutputDescriptor; import org.apache.tez.runtime.api.AbstractLogicalOutput; import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.TezOutputContext; import org.apache.tez.runtime.api.Writer; import org.apache.tez.runtime.api.events.DataMovementEvent; @@ -34,7 +35,11 @@ import com.google.common.collect.Lists; public class TestOutput extends AbstractLogicalOutput { private static final Log LOG = LogFactory.getLog(TestOutput.class); - + + public TestOutput(TezOutputContext outputContext, int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + } + public static OutputDescriptor getOutputDesc(byte[] payload) { return new OutputDescriptor(TestOutput.class.getName()). setUserPayload(payload); http://git-wip-us.apache.org/repos/asf/tez/blob/2213c109/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java index a6768a7..76e2bc6 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java +++ b/tez-tests/src/test/java/org/apache/tez/test/dag/MultiAttemptDAG.java @@ -45,6 +45,8 @@ import org.apache.tez.runtime.api.MemoryUpdateCallback; import org.apache.tez.runtime.api.OutputCommitter; import org.apache.tez.runtime.api.OutputCommitterContext; import org.apache.tez.runtime.api.Reader; +import org.apache.tez.runtime.api.TezInputContext; +import org.apache.tez.runtime.api.TezOutputContext; import org.apache.tez.runtime.api.TezRootInputInitializer; import org.apache.tez.runtime.api.TezRootInputInitializerContext; import org.apache.tez.runtime.api.Writer; @@ -76,16 +78,18 @@ public class MultiAttemptDAG { public static class FailOnAttemptVertexManagerPlugin extends VertexManagerPlugin { private int numSourceTasks = 0; private AtomicInteger numCompletions = new AtomicInteger(); - private VertexManagerPluginContext context; private boolean tasksScheduled = false; + public FailOnAttemptVertexManagerPlugin(VertexManagerPluginContext context) { + super(context); + } + @Override - public void initialize(VertexManagerPluginContext context) { - this.context = context; + public void initialize() { for (String input : - context.getInputVertexEdgeProperties().keySet()) { + getContext().getInputVertexEdgeProperties().keySet()) { LOG.info("Adding sourceTasks for Vertex " + input); - numSourceTasks += context.getVertexNumTasks(input); + numSourceTasks += getContext().getVertexNumTasks(input); LOG.info("Current numSourceTasks=" + numSourceTasks); } } @@ -107,21 +111,21 @@ public class MultiAttemptDAG { if (numCompletions.get() >= numSourceTasks && !tasksScheduled) { tasksScheduled = true; - String payload = new String(context.getUserPayload()); + String payload = new String(getContext().getUserPayload()); int successAttemptId = Integer.valueOf(payload); LOG.info("Checking whether to crash AM or schedule tasks" + ", successfulAttemptID=" + successAttemptId - + ", currentAttempt=" + context.getDAGAttemptNumber()); - if (successAttemptId > context.getDAGAttemptNumber()) { + + ", currentAttempt=" + getContext().getDAGAttemptNumber()); + if (successAttemptId > getContext().getDAGAttemptNumber()) { Runtime.getRuntime().halt(-1); - } else if (successAttemptId == context.getDAGAttemptNumber()) { - LOG.info("Scheduling tasks for vertex=" + context.getVertexName()); - int numTasks = context.getVertexNumTasks(context.getVertexName()); + } else if (successAttemptId == getContext().getDAGAttemptNumber()) { + LOG.info("Scheduling tasks for vertex=" + getContext().getVertexName()); + int numTasks = getContext().getVertexNumTasks(getContext().getVertexName()); List<TaskWithLocationHint> scheduledTasks = Lists.newArrayListWithCapacity(numTasks); for (int i=0; i<numTasks; ++i) { scheduledTasks.add(new TaskWithLocationHint(new Integer(i), null)); } - context.scheduleVertexTasks(scheduledTasks); + getContext().scheduleVertexTasks(scheduledTasks); } } } @@ -150,11 +154,15 @@ public class MultiAttemptDAG { boolean failOnCommit = false; + public FailingOutputCommitter(OutputCommitterContext committerContext) { + super(committerContext); + } + @Override - public void initialize(OutputCommitterContext context) throws Exception { + public void initialize() throws Exception { FailingOutputCommitterConfig config = new FailingOutputCommitterConfig(); - config.fromUserPayload(context.getOutputUserPayload()); + config.fromUserPayload(getContext().getOutputUserPayload()); failOnCommit = config.failOnCommit; } @@ -204,14 +212,18 @@ public class MultiAttemptDAG { public static class FailingInputInitializer extends TezRootInputInitializer { + public FailingInputInitializer(TezRootInputInitializerContext initializerContext) { + super(initializerContext); + } + @Override - public List<Event> initialize(TezRootInputInitializerContext inputVertexContext) throws Exception { + public List<Event> initialize() throws Exception { try { Thread.sleep(2000l); } catch (InterruptedException e) { // Ignore } - if (inputVertexContext.getDAGAttemptNumber() == 1) { + if (getContext().getDAGAttemptNumber() == 1) { LOG.info("Shutting down the AM in 1st attempt"); Runtime.getRuntime().halt(-1); } @@ -227,9 +239,13 @@ public class MultiAttemptDAG { public static class NoOpInput extends AbstractLogicalInput { + public NoOpInput(TezInputContext inputContext, int numPhysicalInputs) { + super(inputContext, numPhysicalInputs); + } + @Override public List<Event> initialize() throws Exception { - inputContext.requestInitialMemory(1l, new MemoryUpdateCallback() { + getContext().requestInitialMemory(1l, new MemoryUpdateCallback() { @Override public void memoryAssigned(long assignedSize) {} }); @@ -259,11 +275,17 @@ public class MultiAttemptDAG { public static class NoOpOutput extends AbstractLogicalOutput { + public NoOpOutput(TezOutputContext outputContext, + int numPhysicalOutputs) { + super(outputContext, numPhysicalOutputs); + } + @Override public List<Event> initialize() throws Exception { - outputContext.requestInitialMemory(1l, new MemoryUpdateCallback() { + getContext().requestInitialMemory(1l, new MemoryUpdateCallback() { @Override - public void memoryAssigned(long assignedSize) {} + public void memoryAssigned(long assignedSize) { + } }); return null; }
