Repository: tez Updated Branches: refs/heads/master 363dc07fc -> af1884a12
TEZ-1322. OrderedWordCount broken in master branch. (hitesh) Project: http://git-wip-us.apache.org/repos/asf/tez/repo Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/af1884a1 Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/af1884a1 Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/af1884a1 Branch: refs/heads/master Commit: af1884a1254bb3208daf09b184004b5007f43a72 Parents: 363dc07 Author: Hitesh Shah <[email protected]> Authored: Thu Jul 31 11:18:28 2014 -0700 Committer: Hitesh Shah <[email protected]> Committed: Thu Jul 31 11:18:28 2014 -0700 ---------------------------------------------------------------------- .../impl/SimpleHistoryLoggingService.java | 6 +- .../mapreduce/examples/IntersectValidate.java | 2 +- .../mapreduce/examples/OrderedWordCount.java | 43 +++-- .../java/org/apache/tez/test/TestTezJobs.java | 155 ++++++++++++++++++- 4 files changed, 187 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tez/blob/af1884a1/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java index 0300225..8ef1920 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java @@ -63,6 +63,8 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService { final String logFileName = LOG_FILE_NAME_PREFIX + "." + appContext.getApplicationAttemptId(); if (logDirPath == null || logDirPath.isEmpty()) { String logDir = TezUtils.getContainerLogDir(); + LOG.info("Log file location for SimpleHistoryLoggingService not specified, defaulting to" + + " containerLogDir=" + logDir); Path p; logFileFS = FileSystem.getLocal(conf); if (logDir != null) { @@ -72,6 +74,8 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService { } logFileLocation = p; } else { + LOG.info("Using configured log file location for SimpleHistoryLoggingService" + + " logDirPath=" + logDirPath); Path p = new Path(logDirPath); logFileFS = p.getFileSystem(conf); if (!logFileFS.exists(p)) { @@ -126,7 +130,7 @@ public class SimpleHistoryLoggingService extends HistoryLoggingService { } try { if (outputStream != null) { - outputStream.hsync(); + outputStream.hflush(); outputStream.close(); } } catch (IOException ioe) { http://git-wip-us.apache.org/repos/asf/tez/blob/af1884a1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java index bf5aa01..b0a5c6c 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java @@ -180,7 +180,7 @@ public class IntersectValidate extends Configured implements Tool { LOG.info("Validate failed. The two sides are not equivalent"); return -3; } else { - LOG.info("Vlidation successful. The two sides are equivalent"); + LOG.info("Validation successful. The two sides are equivalent"); return 0; } } http://git-wip-us.apache.org/repos/asf/tez/blob/af1884a1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java index 37de9b5..7af5402 100644 --- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java +++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java @@ -76,10 +76,13 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig; import org.apache.tez.mapreduce.processor.map.MapProcessor; import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor; import org.apache.tez.runtime.api.TezRootInputInitializer; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer; import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.apache.tez.runtime.library.processor.SleepProcessor; +import com.google.common.annotations.VisibleForTesting; + /** * An MRR job built on top of word count to return words sorted by * their frequency of occurrence. @@ -144,8 +147,9 @@ public class OrderedWordCount extends Configured implements Tool { } private Credentials credentials = new Credentials(); - - private DAG createDAG(FileSystem fs, Configuration conf, + + @VisibleForTesting + public DAG createDAG(FileSystem fs, Configuration conf, Map<String, LocalResource> commonLocalResources, Path stagingDir, int dagIndex, String inputPath, String outputPath, boolean generateSplitsInClient) throws Exception { @@ -172,15 +176,14 @@ public class OrderedWordCount extends Configured implements Tool { MRHelpers.translateVertexConfToTez(mapStageConf); Configuration iReduceStageConf = new JobConf(conf); - iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2); // TODO NEWTEZ - NOT NEEDED NOW??? + // TODO replace with auto-reduce parallelism + iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2); iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR, IntSumReducer.class.getName()); - iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS, + iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); - iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS, - Text.class.getName()); iReduceStageConf.setBoolean("mapred.mapper.new-api", true); - MRHelpers.translateVertexConfToTez(iReduceStageConf); Configuration finalReduceConf = new JobConf(conf); @@ -191,7 +194,8 @@ public class OrderedWordCount extends Configured implements Tool { TextOutputFormat.class.getName()); finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath); finalReduceConf.setBoolean("mapred.mapper.new-api", true); - + finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName()); + finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName()); MRHelpers.translateVertexConfToTez(finalReduceConf); MRHelpers.doJobClientMagic(mapStageConf); @@ -255,18 +259,25 @@ public class OrderedWordCount extends Configured implements Tool { MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload); vertices.add(finalReduceVertex); - OrderedPartitionedKVEdgeConfigurer edgeConf = OrderedPartitionedKVEdgeConfigurer - .newBuilder(IntWritable.class.getName(), Text.class.getName(), - HashPartitioner.class.getName(), null).configureInput().useLegacyInput().done().build(); - DAG dag = new DAG("OrderedWordCount" + dagIndex); for (int i = 0; i < vertices.size(); ++i) { dag.addVertex(vertices.get(i)); - if (i != 0) { - dag.addEdge( - new Edge(vertices.get(i - 1), vertices.get(i), edgeConf.createDefaultEdgeProperty())); - } } + + OrderedPartitionedKVEdgeConfigurer edgeConf1 = OrderedPartitionedKVEdgeConfigurer + .newBuilder(Text.class.getName(), IntWritable.class.getName(), + HashPartitioner.class.getName(), null).configureInput().useLegacyInput().done().build(); + dag.addEdge( + new Edge(dag.getVertex("initialmap"), dag.getVertex("intermediate_reducer"), + edgeConf1.createDefaultEdgeProperty())); + + OrderedPartitionedKVEdgeConfigurer edgeConf2 = OrderedPartitionedKVEdgeConfigurer + .newBuilder(IntWritable.class.getName(), Text.class.getName(), + HashPartitioner.class.getName(), null).configureInput().useLegacyInput().done().build(); + dag.addEdge( + new Edge(dag.getVertex("intermediate_reducer"), dag.getVertex("finalreduce"), + edgeConf2.createDefaultEdgeProperty())); + return dag; } http://git-wip-us.apache.org/repos/asf/tez/blob/af1884a1/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java ---------------------------------------------------------------------- diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java index baf0e87..e35bc07 100644 --- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java +++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java @@ -25,12 +25,15 @@ import static org.junit.Assert.fail; import java.io.BufferedReader; import java.io.BufferedWriter; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.util.HashSet; +import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -42,6 +45,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.tez.client.TezClient; @@ -60,6 +65,7 @@ import org.apache.tez.mapreduce.examples.ExampleDriver; import org.apache.tez.mapreduce.examples.IntersectDataGen; import org.apache.tez.mapreduce.examples.IntersectExample; import org.apache.tez.mapreduce.examples.IntersectValidate; +import org.apache.tez.mapreduce.examples.OrderedWordCount; import org.apache.tez.runtime.library.processor.SleepProcessor; import org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig; import org.junit.AfterClass; @@ -256,7 +262,7 @@ public class TestTezJobs { assertEquals(1, statuses.length); FSDataInputStream inStream = remoteFs.open(statuses[0].getPath()); BufferedReader reader = new BufferedReader(new InputStreamReader(inStream)); - String line = null; + String line; while ((line = reader.readLine()) != null) { assertTrue(expectedResult.remove(line)); } @@ -404,4 +410,151 @@ public class TestTezJobs { tezSession.stop(); } + private void generateOrderedWordCountInput(Path inputDir) throws IOException { + Path dataPath1 = new Path(inputDir, "inPath1"); + Path dataPath2 = new Path(inputDir, "inPath2"); + + FSDataOutputStream f1 = null; + FSDataOutputStream f2 = null; + try { + f1 = remoteFs.create(dataPath1); + f2 = remoteFs.create(dataPath2); + + final String prefix = "a"; + for (int i = 1; i <= 10; ++i) { + final String word = prefix + "_" + i; + for (int j = 10; j >= i; --j) { + LOG.info("Writing " + word + " to input files"); + f1.write(word.getBytes()); + f1.writeChars("\t"); + f2.write(word.getBytes()); + f2.writeChars("\t"); + } + } + f1.hsync(); + f2.hsync(); + } finally { + if (f1 != null) { + f1.close(); + } + if (f2 != null) { + f2.close(); + } + } + } + + private void verifyOrderedWordCountOutput(Path resultFile) throws IOException { + FSDataInputStream inputStream = remoteFs.open(resultFile); + final String prefix = "a"; + int currentCounter = 10; + + byte[] buffer = new byte[4096]; + int bytesRead = inputStream.read(buffer, 0, 4096); + + BufferedReader reader = + new BufferedReader(new InputStreamReader(new ByteArrayInputStream(buffer, 0, bytesRead))); + + String line; + while ((line = reader.readLine()) != null) { + LOG.info("Line: " + line + ", counter=" + currentCounter); + int pos = line.indexOf("\t"); + String word = line.substring(0, pos-1); + Assert.assertEquals(prefix + "_" + currentCounter, word); + String val = line.substring(pos+1, line.length()); + Assert.assertEquals((long)(11 - currentCounter) * 2, (long)Long.valueOf(val)); + currentCounter--; + } + + Assert.assertEquals(0, currentCounter); + } + + @Test(timeout = 60000) + public void testOrderedWordCount() throws Exception { + String inputDirStr = "/tmp/owc-input/"; + Path inputDir = new Path(inputDirStr); + Path stagingDirPath = new Path("/tmp/owc-staging-dir"); + remoteFs.mkdirs(inputDir); + remoteFs.mkdirs(stagingDirPath); + generateOrderedWordCountInput(inputDir); + + String outputDirStr = "/tmp/owc-output/"; + Path outputDir = new Path(outputDirStr); + + TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig()); + Path simpleLogPath = new Path("/tmp/owc-logging/"); + remoteFs.mkdirs(simpleLogPath); + simpleLogPath = remoteFs.resolvePath(simpleLogPath); + tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, simpleLogPath.toString()); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDirPath.toString()); + TezClient tezSession = null; + + try { + tezSession = new TezClient("OrderedWordCountSession", tezConf); + tezSession.start(); + tezSession.waitTillReady(); + + Map<String, LocalResource> localResourceMap = new TreeMap<String, LocalResource>(); + + OrderedWordCount orderedWordCount = new OrderedWordCount(); + DAG dag = orderedWordCount.createDAG(remoteFs, tezConf, localResourceMap, stagingDirPath, + 1, inputDirStr, outputDirStr, false); + + DAGClient dagClient = tezSession.submitDAG(dag); + DAGStatus dagStatus = dagClient.getDAGStatus(null); + while (!dagStatus.isCompleted()) { + LOG.info("Waiting for job to complete. Sleeping for 500ms." + " Current state: " + + dagStatus.getState()); + Thread.sleep(500l); + dagStatus = dagClient.getDAGStatus(null); + } + dagStatus = dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS)); + + assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState()); + assertTrue(remoteFs.exists(outputDir)); + if (tezSession != null) { + tezSession.stop(); + tezSession = null; + } + + FileStatus[] fileStatuses = remoteFs.listStatus(outputDir); + Path resultFile = null; + boolean foundResult = false; + boolean foundSuccessFile = false; + for (FileStatus fileStatus : fileStatuses) { + if (!fileStatus.isFile()) { + continue; + } + if (fileStatus.getPath().getName().equals("_SUCCESS")) { + foundSuccessFile = true; + continue; + } + if (fileStatus.getPath().getName().startsWith("part-")) { + if (foundResult) { + fail("Found 2 part files instead of 1" + + ", paths=" + resultFile + "," + fileStatus.getPath()); + } + foundResult = true; + resultFile = fileStatus.getPath(); + LOG.info("Found output at " + resultFile); + } + } + assertTrue(foundResult); + assertTrue(resultFile != null); + assertTrue(foundSuccessFile); + verifyOrderedWordCountOutput(resultFile); + + // check simple history log exists + FileStatus[] fileStatuses1 = remoteFs.listStatus(simpleLogPath); + Assert.assertEquals(1, fileStatuses1.length); + Assert.assertTrue(fileStatuses1[0].getPath().getName().startsWith( + SimpleHistoryLoggingService.LOG_FILE_NAME_PREFIX)); + } finally { + if (tezSession != null) { + tezSession.stop(); + } + } + + } + + }
