TEZ-2219. Should verify the input_name/output_name to be unique per vertex (zjffdu)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/aa784be6 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/aa784be6 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/aa784be6 Branch: refs/heads/TEZ-2003 Commit: aa784be6e0a7d4b059212f047f3100cfa085f6e1 Parents: be982af Author: Jeff Zhang <[email protected]> Authored: Mon Mar 23 14:55:46 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Mon Mar 23 14:55:46 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../java/org/apache/tez/dag/api/Vertex.java | 35 ++++-- .../java/org/apache/tez/dag/api/TestDAG.java | 117 ++++++++++++++++++- 3 files changed, 141 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/aa784be6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 7a554d4..3c9d114 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -249,6 +249,7 @@ TEZ-UI CHANGES (TEZ-8): Release 0.5.4: Unreleased ALL CHANGES: + TEZ-2219. Should verify the input_name/output_name to be unique per vertex TEZ-2186. OOM with a simple scatter gather job with re-use TEZ-2220. TestTezJobs compile failure in branch 0.5. TEZ-2199. updateLocalResourcesForInputSplits assumes wrongly that split data is on same FS as the default FS. http://git-wip-us.apache.org/repos/asf/tez/blob/aa784be6/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java ---------------------------------------------------------------------- diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java index c8d3df7..0ed4bd8 100644 --- a/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java +++ b/tez-api/src/main/java/org/apache/tez/dag/api/Vertex.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; @@ -56,10 +57,10 @@ public class Vertex { private final Map<String, LocalResource> taskLocalResources = new HashMap<String, LocalResource>(); private Map<String, String> taskEnvironment = new HashMap<String, String>(); private Map<String, String> vertexConf = new HashMap<String, String>(); - private final List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs - = new ArrayList<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>(); - private final List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> additionalOutputs - = new ArrayList<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>(); + private final Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs + = new HashMap<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>>(); + private final Map<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> additionalOutputs + = new HashMap<String, RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>>(); private VertexManagerPluginDescriptor vertexManagerPlugin; private final List<Vertex> inputVertices = new ArrayList<Vertex>(); @@ -329,8 +330,12 @@ public class Vertex { * @return this Vertex */ public Vertex addDataSource(String inputName, DataSourceDescriptor dataSourceDescriptor) { + Preconditions.checkArgument(StringUtils.isNotBlank(inputName), + "InputName should not be null, empty or white space only, inputName=" + inputName); + Preconditions.checkArgument(!additionalInputs.containsKey(inputName), + "Duplicated input:" + inputName + ", vertexName=" + vertexName); additionalInputs - .add(new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>( + .put(inputName, new RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>( inputName, dataSourceDescriptor.getInputDescriptor(), dataSourceDescriptor.getInputInitializerDescriptor())); this.dataSources.add(dataSourceDescriptor); @@ -356,19 +361,27 @@ public class Vertex { * @return this Vertex */ public Vertex addDataSink(String outputName, DataSinkDescriptor dataSinkDescriptor) { + Preconditions.checkArgument(StringUtils.isNotBlank(outputName), + "OutputName should not be null, empty or white space only, outputName=" + outputName); + Preconditions.checkArgument(!additionalOutputs.containsKey(outputName), + "Duplicated output:" + outputName + ", vertexName=" + vertexName); additionalOutputs - .add(new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>( + .put(outputName, new RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>( outputName, dataSinkDescriptor.getOutputDescriptor(), dataSinkDescriptor.getOutputCommitterDescriptor())); this.dataSinks.add(dataSinkDescriptor); return this; } - + Vertex addAdditionalDataSink(RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> output) { - additionalOutputs.add(output); + Preconditions.checkArgument(StringUtils.isNotBlank(output.getName()), + "OutputName should not be null, empty or white space only, outputName=" + output.getName()); + Preconditions.checkArgument(!additionalOutputs.containsKey(output.getName()), + "Duplicated output:" + output.getName() + ", vertexName=" + vertexName); + additionalOutputs.put(output.getName(), output); return this; } - + /** * Specifies a {@link VertexManagerPlugin} for the vertex. This plugin can be * used to modify the parallelism or reconfigure the vertex at runtime using @@ -471,11 +484,11 @@ public class Vertex { } List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> getInputs() { - return additionalInputs; + return Lists.newArrayList(additionalInputs.values()); } List<RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>> getOutputs() { - return additionalOutputs; + return Lists.newArrayList(additionalOutputs.values()); } @Override http://git-wip-us.apache.org/repos/asf/tez/blob/aa784be6/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java ---------------------------------------------------------------------- diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java index 8e7f80b..0590907 100644 --- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java +++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java @@ -177,7 +177,7 @@ public class TestDAG { // set invalid DAG level configuration try { v1.setConf(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false + ""); - Assert.fail(); + Assert.fail("should fail due to invalid configuration set"); } catch (IllegalStateException e) { Assert.assertEquals("tez.am.commit-all-outputs-on-dag-success is set at the scope of VERTEX," + " but it is only valid in the scope of DAG", @@ -186,4 +186,119 @@ public class TestDAG { // set valid Vertex level configuration v1.setConf(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 3 + ""); } + + @Test(timeout = 5000) + public void testDuplicatedInput() { + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor")); + DataSourceDescriptor dataSource = + DataSourceDescriptor.create(InputDescriptor.create("dummyInput"), null, null); + try { + v1.addDataSource(null, dataSource); + Assert.fail("Should fail due to invalid inputName"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage() + .contains("InputName should not be null, empty or white space only,")); + } + try { + v1.addDataSource("", dataSource); + Assert.fail("Should fail due to invalid inputName"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage() + .contains("InputName should not be null, empty or white space only,")); + } + try { + v1.addDataSource(" ", dataSource); + Assert.fail("Should fail due to invalid inputName"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage() + .contains("InputName should not be null, empty or white space only,")); + } + + v1.addDataSource("input_1", dataSource); + try { + v1.addDataSource("input_1", + DataSourceDescriptor.create(InputDescriptor.create("dummyInput"), null, null)); + Assert.fail("Should fail due to duplicated input"); + } catch (IllegalArgumentException e) { + Assert.assertEquals("Duplicated input:input_1, vertexName=v1", e.getMessage()); + } + } + + @Test(timeout = 5000) + public void testDuplicatedOutput_1() { + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor")); + DataSinkDescriptor dataSink = + DataSinkDescriptor.create(OutputDescriptor.create("dummyOutput"), null, null); + try { + v1.addDataSink(null, dataSink); + Assert.fail("Should fail due to invalid outputName"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage() + .contains("OutputName should not be null, empty or white space only,")); + } + try { + v1.addDataSink("", dataSink); + Assert.fail("Should fail due to invalid outputName"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage() + .contains("OutputName should not be null, empty or white space only,")); + } + try { + v1.addDataSink(" ", dataSink); + Assert.fail("Should fail due to invalid outputName"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage() + .contains("OutputName should not be null, empty or white space only,")); + } + + v1.addDataSink("output_1", + DataSinkDescriptor.create(OutputDescriptor.create("dummyOutput"), null, null)); + try { + v1.addDataSink("output_1", + DataSinkDescriptor.create(OutputDescriptor.create("dummyOutput"), null, null)); + Assert.fail("Should fail due to duplicated output"); + } catch (IllegalArgumentException e) { + Assert.assertEquals("Duplicated output:output_1, vertexName=v1", e.getMessage()); + } + } + + @Test(timeout = 5000) + public void testDuplicatedOutput_2() { + DAG dag = DAG.create("dag1"); + Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor")); + DataSinkDescriptor dataSink = + DataSinkDescriptor.create(OutputDescriptor.create("dummyOutput"), null, null); + try { + v1.addDataSink(null, dataSink); + Assert.fail("Should fail due to invalid outputName"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage() + .contains("OutputName should not be null, empty or white space only,")); + } + try { + v1.addDataSink("", dataSink); + Assert.fail("Should fail due to invalid outputName"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage() + .contains("OutputName should not be null, empty or white space only,")); + } + try { + v1.addDataSink(" ", dataSink); + Assert.fail("Should fail due to invalid outputName"); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage() + .contains("OutputName should not be null, empty or white space only,")); + } + + v1.addDataSink("output_1", dataSink); + Vertex v2 = Vertex.create("v1", ProcessorDescriptor.create("dummyProcessor")); + VertexGroup vGroup = dag.createVertexGroup("group_1", v1,v2); + try { + vGroup.addDataSink("output_1", + DataSinkDescriptor.create(OutputDescriptor.create("dummyOutput"), null, null)); + Assert.fail("Should fail due to duplicated output"); + } catch (IllegalArgumentException e) { + Assert.assertEquals("Duplicated output:output_1, vertexName=v1", e.getMessage()); + } + } }
