Repository: tez Updated Branches: refs/heads/master 42ef63665 -> 685fa742f
TEZ-2976. Recovery fails when InputDescriptor is changed during input initialization (zjffdu) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/685fa742 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/685fa742 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/685fa742 Branch: refs/heads/master Commit: 685fa742f7042876d8a87544b1e08ce566837e65 Parents: 42ef636 Author: Jeff Zhang <[email protected]> Authored: Wed Dec 16 10:02:13 2015 +0800 Committer: Jeff Zhang <[email protected]> Committed: Wed Dec 16 10:03:40 2015 +0800 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/tez/dag/app/dag/impl/VertexImpl.java | 2 + .../apache/tez/examples/HashJoinExample.java | 6 ++- .../org/apache/tez/examples/JoinValidate.java | 6 ++- .../apache/tez/examples/OrderedWordCount.java | 7 ++-- .../tez/examples/SimpleSessionExample.java | 2 +- .../tez/examples/SortMergeJoinExample.java | 6 ++- .../org/apache/tez/examples/TezExampleBase.java | 19 +++++++-- .../java/org/apache/tez/examples/WordCount.java | 3 +- .../RecoveryServiceWithEventHandlingHook.java | 4 ++ .../java/org/apache/tez/test/TestLocalMode.java | 2 +- .../java/org/apache/tez/test/TestRecovery.java | 44 ++++++++++++++------ 12 files changed, 75 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0236084..a9d1893 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -10,6 +10,7 @@ INCOMPATIBLE CHANGES TEZ-604. Revert temporary changes made in TEZ-603 to kill the provided tez session, if running a MapReduce job. ALL CHANGES: + TEZ-2976. Recovery fails when InputDescriptor is changed during input initialization. TEZ-2997. Tez UI: Support searches by CallerContext ID for DAGs TEZ-2996. TestAnalyzer fails in trunk after recovery redesign TEZ-2987. TestVertexImpl.testTez2684 fails http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java index f3cfb58..4e82560 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java @@ -2231,6 +2231,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl if (initGeneratedEvents != null && !initGeneratedEvents.isEmpty()) { eventHandler.handle(new VertexEventRouteEvent(getVertexId(), initGeneratedEvents)); } + // reset rootInputDescriptor because it may be changed during input initialization. + this.rootInputDescriptors = recoveryData.getVertexInitedEvent().getAdditionalInputs(); } else { initedTime = clock.getTime(); } http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java index 680de35..935ccbc 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/HashJoinExample.java @@ -157,7 +157,8 @@ public class HashJoinExample extends TezExampleBase { MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, hashPath.toUri().toString()) - .groupSplits(!isDisableSplitGrouping()).build()); + .groupSplits(!isDisableSplitGrouping()) + .generateSplitsInAM(!isGenerateSplitInClient()).build()); /** * This vertex represents that side of the data that will be streamed and @@ -173,7 +174,8 @@ public class HashJoinExample extends TezExampleBase { MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, streamPath.toUri().toString()) - .groupSplits(!isDisableSplitGrouping()).build()); + .groupSplits(!isDisableSplitGrouping()) + .generateSplitsInAM(!isGenerateSplitInClient()).build()); /** * This vertex represents the join operation. It writes the join output as http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java index 4883351..186bacd 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/JoinValidate.java @@ -155,14 +155,16 @@ public class JoinValidate extends TezExampleBase { ForwardingProcessor.class.getName())).addDataSource("lhs", MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, - lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build()); + lhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()) + .generateSplitsInAM(!isGenerateSplitInClient()).build()); setVertexExecutionContext(lhsVertex, getLhsExecutionContext()); Vertex rhsVertex = Vertex.create(RHS_INPUT_NAME, ProcessorDescriptor.create( ForwardingProcessor.class.getName())).addDataSource("rhs", MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, - rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()).build()); + rhs.toUri().toString()).groupSplits(!isDisableSplitGrouping()) + .generateSplitsInAM(!isGenerateSplitInClient()).build()); setVertexExecutionContext(rhsVertex, getRhsExecutionContext()); Vertex joinValidateVertex = Vertex.create("joinvalidate", ProcessorDescriptor.create( http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java index fff7c1b..6596809 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/OrderedWordCount.java @@ -123,10 +123,11 @@ public class OrderedWordCount extends TezExampleBase { } public static DAG createDAG(TezConfiguration tezConf, String inputPath, String outputPath, - int numPartitions, boolean disableSplitGrouping, String dagName) throws IOException { + int numPartitions, boolean disableSplitGrouping, boolean isGenerateSplitInClient, String dagName) throws IOException { DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf), - TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping).build(); + TextInputFormat.class, inputPath).groupSplits(!disableSplitGrouping) + .generateSplitsInAM(!isGenerateSplitInClient).build(); DataSinkDescriptor dataSink = MROutput.createConfigBuilder(new Configuration(tezConf), TextOutputFormat.class, outputPath).build(); @@ -198,7 +199,7 @@ public class OrderedWordCount extends TezExampleBase { TezClient tezClient) throws Exception { DAG dag = createDAG(tezConf, args[0], args[1], args.length == 3 ? Integer.parseInt(args[2]) : 1, isDisableSplitGrouping(), - "OrderedWordCount"); + isGenerateSplitInClient(), "OrderedWordCount"); LOG.info("Running OrderedWordCount"); return runDag(dag, isCountersLog(), LOG); } http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java index 08a4b12..d555f47 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/SimpleSessionExample.java @@ -87,7 +87,7 @@ public class SimpleSessionExample extends TezExampleBase { for (int i = 0; i < inputPaths.length; ++i) { DAG dag = OrderedWordCount.createDAG(tezConf, inputPaths[i], outputPaths[i], numPartitions, - isDisableSplitGrouping(), ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session + isDisableSplitGrouping(), isGenerateSplitInClient(), ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session LOG.info("Running dag number " + i); if(runDag(dag, isCountersLog(), LOG) != 0) { http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java index 5a0cbd8..1054e00 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/SortMergeJoinExample.java @@ -158,7 +158,8 @@ public class SortMergeJoinExample extends TezExampleBase { MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath1.toUri().toString()) - .groupSplits(!isDisableSplitGrouping()).build()); + .groupSplits(!isDisableSplitGrouping()) + .generateSplitsInAM(!isGenerateSplitInClient()).build()); /** * The other vertex represents the other side of the join. It reads text @@ -173,7 +174,8 @@ public class SortMergeJoinExample extends TezExampleBase { MRInput .createConfigBuilder(new Configuration(tezConf), TextInputFormat.class, inputPath2.toUri().toString()) - .groupSplits(!isDisableSplitGrouping()).build()); + .groupSplits(!isDisableSplitGrouping()) + .generateSplitsInAM(!isGenerateSplitInClient()).build()); /** * This vertex represents the join operation. It writes the join output as http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java index 281eaa9..6960559 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/TezExampleBase.java @@ -58,10 +58,12 @@ public abstract class TezExampleBase extends Configured implements Tool { protected static final String DISABLE_SPLIT_GROUPING = "disableSplitGrouping"; protected static final String LOCAL_MODE = "local"; protected static final String COUNTER_LOG = "counter"; + protected static final String GENERATE_SPLIT_IN_CLIENT = "generateSplitInClient"; private boolean disableSplitGrouping = false; private boolean isLocalMode = false; private boolean isCountersLog = false; + private boolean generateSplitInClient = false; protected boolean isCountersLog() { return isCountersLog; @@ -71,11 +73,16 @@ public abstract class TezExampleBase extends Configured implements Tool { return disableSplitGrouping; } + protected boolean isGenerateSplitInClient() { + return generateSplitInClient; + } + private Options getExtraOptions() { Options options = new Options(); options.addOption(LOCAL_MODE, false, "run it as local mode"); options.addOption(DISABLE_SPLIT_GROUPING, false , "disable split grouping"); options.addOption(COUNTER_LOG, false , "print counter log"); + options.addOption(GENERATE_SPLIT_IN_CLIENT, false, "whether generate split in client"); return options; } @@ -91,9 +98,11 @@ public abstract class TezExampleBase extends Configured implements Tool { disableSplitGrouping = true; } if (optionParser.getCommandLine().hasOption(COUNTER_LOG)) { - isCountersLog = true; + isCountersLog = true; + } + if (optionParser.getCommandLine().hasOption(GENERATE_SPLIT_IN_CLIENT)) { + generateSplitInClient = true; } - return _execute(otherArgs, null, null); } @@ -124,7 +133,10 @@ public abstract class TezExampleBase extends Configured implements Tool { disableSplitGrouping = true; } if (optionParser.getCommandLine().hasOption(COUNTER_LOG)) { - isCountersLog = true; + isCountersLog = true; + } + if (optionParser.getCommandLine().hasOption(GENERATE_SPLIT_IN_CLIENT)) { + generateSplitInClient = true; } String[] otherArgs = optionParser.getRemainingArgs(); return _execute(otherArgs, conf, tezClient); @@ -238,6 +250,7 @@ public abstract class TezExampleBase extends Configured implements Tool { ps.println("-" + DISABLE_SPLIT_GROUPING + "\t\t disable split grouping for MRInput," + " enable split grouping without this option."); ps.println("-" + COUNTER_LOG + "\t\t to print counters information"); + ps.println("-" + GENERATE_SPLIT_IN_CLIENT + "\t\tgenerate input split in client"); ps.println(); ps.println("The Tez example extra options usage syntax is "); ps.println("example_name [extra_options] [example_parameters]"); http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java index 3578267..6149193 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/WordCount.java @@ -143,7 +143,8 @@ public class WordCount extends TezExampleBase { // Create the descriptor that describes the input data to Tez. Using MRInput to read text // data from the given input path. The TextInputFormat is used to read the text data. DataSourceDescriptor dataSource = MRInput.createConfigBuilder(new Configuration(tezConf), - TextInputFormat.class, inputPath).groupSplits(!isDisableSplitGrouping()).build(); + TextInputFormat.class, inputPath).groupSplits(!isDisableSplitGrouping()) + .generateSplitsInAM(!isGenerateSplitInClient()).build(); // Create a descriptor that describes the output data to Tez. Using MROoutput to write text // data to the given output path. The TextOutputFormat is used to write the text data. http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java index cec8fbd..8a0f39e 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java +++ b/tez-tests/src/test/java/org/apache/tez/test/RecoveryServiceWithEventHandlingHook.java @@ -222,6 +222,10 @@ public class RecoveryServiceWithEventHandlingHook extends RecoveryService { public SimpleShutdownCondition() { } + public HistoryEvent getHistoryEvent() { + return this.event; + } + private String encodeHistoryEvent(HistoryEvent event) throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); event.toProtoStream(out); http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java index 3a03739..2a5b65f 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestLocalMode.java @@ -230,7 +230,7 @@ public class TestLocalMode { try { for (int i=0; i<inputPaths.length; ++i) { DAG dag = OrderedWordCount.createDAG(tezConf, inputPaths[i], outputPaths[i], 1, - false, ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session + false, false, ("DAG-Iteration-" + i)); // the names of the DAGs must be unique in a session tezClient.waitTillReady(); System.out.println("Running dag number " + i); http://git-wip-us.apache.org/repos/asf/tez/blob/685fa742/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java index 4f5ef1a..dc26167 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java @@ -52,6 +52,7 @@ import org.apache.tez.dag.app.dag.DAGState; import org.apache.tez.dag.app.dag.VertexState; import org.apache.tez.dag.app.dag.impl.VertexStats; import org.apache.tez.dag.history.HistoryEvent; +import org.apache.tez.dag.history.HistoryEventType; import org.apache.tez.dag.history.events.DAGFinishedEvent; import org.apache.tez.dag.history.events.DAGInitializedEvent; import org.apache.tez.dag.history.events.DAGStartedEvent; @@ -70,6 +71,7 @@ import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.examples.HashJoinExample; import org.apache.tez.examples.OrderedWordCount; +import org.apache.tez.examples.TezExampleBase; import org.apache.tez.runtime.api.events.InputDataInformationEvent; import org.apache.tez.runtime.api.impl.TezEvent; import org.apache.tez.test.RecoveryServiceWithEventHandlingHook.SimpleRecoveryEventHook; @@ -239,13 +241,15 @@ public class TestRecovery { // randomly choose half of the test scenario to avoid // timeout. if (rand.nextDouble() < 0.5) { - testOrderedWordCount(shutdownConditions.get(i), true); + // generate split in client side when HistoryEvent type is VERTEX_STARTED (TEZ-2976) + testOrderedWordCount(shutdownConditions.get(i), true, + shutdownConditions.get(i).getHistoryEvent().getEventType() == HistoryEventType.VERTEX_STARTED); } } } private void testOrderedWordCount(SimpleShutdownCondition shutdownCondition, - boolean enableAutoParallelism) throws Exception { + boolean enableAutoParallelism, boolean generateSplitInClient) throws Exception { LOG.info("shutdownCondition:" + shutdownCondition.getEventType() + ", event=" + shutdownCondition.getEvent()); String inputDirStr = "/tmp/owc-input/"; @@ -276,11 +280,16 @@ public class TestRecovery { tezConf.setBoolean( TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false); tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO;org.apache.tez=DEBUG"); - OrderedWordCount job = new OrderedWordCount(); - Assert - .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[] { - inputDirStr, outputDirStr, "5" }, null) == 0); + if (generateSplitInClient) { + Assert + .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{ + "-generateSplitInClient", inputDirStr, outputDirStr, "5"}, null) == 0); + } else { + Assert + .assertTrue("OrderedWordCount failed", job.run(tezConf, new String[]{ + inputDirStr, outputDirStr, "5"}, null) == 0); + } TestTezJobs.verifyOutput(outputDir, remoteFs); List<HistoryEvent> historyEventsOfAttempt1 = RecoveryParser .readRecoveryEvents(tezConf, job.getAppId(), 1); @@ -392,13 +401,15 @@ public class TestRecovery { // randomly choose half of the test scenario to avoid // timeout. if (rand.nextDouble() < 0.5) { - testHashJoinExample(shutdownConditions.get(i), true); + // generate split in client side when HistoryEvent type is VERTEX_STARTED (TEZ-2976) + testHashJoinExample(shutdownConditions.get(i), true, + shutdownConditions.get(i).getHistoryEvent().getEventType() == HistoryEventType.VERTEX_STARTED); } } } private void testHashJoinExample(SimpleShutdownCondition shutdownCondition, - boolean enableAutoParallelism) throws Exception { + boolean enableAutoParallelism, boolean generateSplitInClient) throws Exception { HashJoinExample hashJoinExample = new HashJoinExample(); TezConfiguration tezConf = new TezConfiguration(miniTezCluster.getConfig()); tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4); @@ -449,10 +460,19 @@ public class TestRecovery { out1.close(); out2.close(); - String[] args = new String[] { - "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" - + stagingDirPath.toString(), inPath1.toString(), - inPath2.toString(), "1", outPath.toString() }; + String[] args = null; + if (generateSplitInClient) { + args = new String[]{ + "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + + stagingDirPath.toString(), + "-generateSplitInClient", + inPath1.toString(), inPath2.toString(), "1", outPath.toString()}; + } else { + args = new String[]{ + "-D" + TezConfiguration.TEZ_AM_STAGING_DIR + "=" + + stagingDirPath.toString(), + inPath1.toString(), inPath2.toString(), "1", outPath.toString()}; + } assertEquals(0, hashJoinExample.run(args)); FileStatus[] statuses = remoteFs.listStatus(outPath, new PathFilter() {
