Repository: tez
Updated Branches:
  refs/heads/master 363dc07fc -> af1884a12


TEZ-1322. OrderedWordCount broken in master branch. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/af1884a1
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/af1884a1
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/af1884a1

Branch: refs/heads/master
Commit: af1884a1254bb3208daf09b184004b5007f43a72
Parents: 363dc07
Author: Hitesh Shah <[email protected]>
Authored: Thu Jul 31 11:18:28 2014 -0700
Committer: Hitesh Shah <[email protected]>
Committed: Thu Jul 31 11:18:28 2014 -0700

----------------------------------------------------------------------
 .../impl/SimpleHistoryLoggingService.java       |   6 +-
 .../mapreduce/examples/IntersectValidate.java   |   2 +-
 .../mapreduce/examples/OrderedWordCount.java    |  43 +++--
 .../java/org/apache/tez/test/TestTezJobs.java   | 155 ++++++++++++++++++-
 4 files changed, 187 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/af1884a1/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
----------------------------------------------------------------------
diff --git 
a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
 
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
index 0300225..8ef1920 100644
--- 
a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
+++ 
b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/SimpleHistoryLoggingService.java
@@ -63,6 +63,8 @@ public class SimpleHistoryLoggingService extends 
HistoryLoggingService {
     final String logFileName = LOG_FILE_NAME_PREFIX + "." + 
appContext.getApplicationAttemptId();
     if (logDirPath == null || logDirPath.isEmpty()) {
       String logDir = TezUtils.getContainerLogDir();
+      LOG.info("Log file location for SimpleHistoryLoggingService not 
specified, defaulting to"
+          + " containerLogDir=" + logDir);
       Path p;
       logFileFS = FileSystem.getLocal(conf);
       if (logDir != null) {
@@ -72,6 +74,8 @@ public class SimpleHistoryLoggingService extends 
HistoryLoggingService {
       }
       logFileLocation = p;
     } else {
+      LOG.info("Using configured log file location for 
SimpleHistoryLoggingService"
+          + " logDirPath=" + logDirPath);
       Path p = new Path(logDirPath);
       logFileFS = p.getFileSystem(conf);
       if (!logFileFS.exists(p)) {
@@ -126,7 +130,7 @@ public class SimpleHistoryLoggingService extends 
HistoryLoggingService {
     }
     try {
       if (outputStream != null) {
-        outputStream.hsync();
+        outputStream.hflush();
         outputStream.close();
       }
     } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/tez/blob/af1884a1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
index bf5aa01..b0a5c6c 100644
--- 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
+++ 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/IntersectValidate.java
@@ -180,7 +180,7 @@ public class IntersectValidate extends Configured 
implements Tool {
           LOG.info("Validate failed. The two sides are not equivalent");
           return -3;
         } else {
-          LOG.info("Vlidation successful. The two sides are equivalent");
+          LOG.info("Validation successful. The two sides are equivalent");
           return 0;
         }
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/af1884a1/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 37de9b5..7af5402 100644
--- 
a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ 
b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -76,10 +76,13 @@ import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.processor.map.MapProcessor;
 import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
 import org.apache.tez.runtime.api.TezRootInputInitializer;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfigurer;
 import org.apache.tez.runtime.library.partitioner.HashPartitioner;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * An MRR job built on top of word count to return words sorted by
  * their frequency of occurrence.
@@ -144,8 +147,9 @@ public class OrderedWordCount extends Configured implements 
Tool {
   }
 
   private Credentials credentials = new Credentials();
-  
-  private DAG createDAG(FileSystem fs, Configuration conf,
+
+  @VisibleForTesting
+  public DAG createDAG(FileSystem fs, Configuration conf,
       Map<String, LocalResource> commonLocalResources, Path stagingDir,
       int dagIndex, String inputPath, String outputPath,
       boolean generateSplitsInClient) throws Exception {
@@ -172,15 +176,14 @@ public class OrderedWordCount extends Configured 
implements Tool {
     MRHelpers.translateVertexConfToTez(mapStageConf);
 
     Configuration iReduceStageConf = new JobConf(conf);
-    iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2); // TODO NEWTEZ - NOT 
NEEDED NOW???
+    // TODO replace with auto-reduce parallelism
+    iReduceStageConf.setInt(MRJobConfig.NUM_REDUCES, 2);
     iReduceStageConf.set(MRJobConfig.REDUCE_CLASS_ATTR,
         IntSumReducer.class.getName());
-    iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_KEY_CLASS,
+    iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, 
Text.class.getName());
+    iReduceStageConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS,
         IntWritable.class.getName());
-    iReduceStageConf.set(MRJobConfig.MAP_OUTPUT_VALUE_CLASS,
-        Text.class.getName());
     iReduceStageConf.setBoolean("mapred.mapper.new-api", true);
-
     MRHelpers.translateVertexConfToTez(iReduceStageConf);
 
     Configuration finalReduceConf = new JobConf(conf);
@@ -191,7 +194,8 @@ public class OrderedWordCount extends Configured implements 
Tool {
         TextOutputFormat.class.getName());
     finalReduceConf.set(FileOutputFormat.OUTDIR, outputPath);
     finalReduceConf.setBoolean("mapred.mapper.new-api", true);
-
+    finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, 
IntWritable.class.getName());
+    finalReduceConf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, 
Text.class.getName());
     MRHelpers.translateVertexConfToTez(finalReduceConf);
 
     MRHelpers.doJobClientMagic(mapStageConf);
@@ -255,18 +259,25 @@ public class OrderedWordCount extends Configured 
implements Tool {
     MRHelpers.addMROutputLegacy(finalReduceVertex, finalReducePayload);
     vertices.add(finalReduceVertex);
 
-    OrderedPartitionedKVEdgeConfigurer edgeConf = 
OrderedPartitionedKVEdgeConfigurer
-        .newBuilder(IntWritable.class.getName(), Text.class.getName(),
-            HashPartitioner.class.getName(), 
null).configureInput().useLegacyInput().done().build();
-
     DAG dag = new DAG("OrderedWordCount" + dagIndex);
     for (int i = 0; i < vertices.size(); ++i) {
       dag.addVertex(vertices.get(i));
-      if (i != 0) {
-        dag.addEdge(
-            new Edge(vertices.get(i - 1), vertices.get(i), 
edgeConf.createDefaultEdgeProperty()));
-      }
     }
+
+    OrderedPartitionedKVEdgeConfigurer edgeConf1 = 
OrderedPartitionedKVEdgeConfigurer
+        .newBuilder(Text.class.getName(), IntWritable.class.getName(),
+            HashPartitioner.class.getName(), 
null).configureInput().useLegacyInput().done().build();
+    dag.addEdge(
+        new Edge(dag.getVertex("initialmap"), 
dag.getVertex("intermediate_reducer"),
+            edgeConf1.createDefaultEdgeProperty()));
+
+    OrderedPartitionedKVEdgeConfigurer edgeConf2 = 
OrderedPartitionedKVEdgeConfigurer
+        .newBuilder(IntWritable.class.getName(), Text.class.getName(),
+            HashPartitioner.class.getName(), 
null).configureInput().useLegacyInput().done().build();
+    dag.addEdge(
+        new Edge(dag.getVertex("intermediate_reducer"), 
dag.getVertex("finalreduce"),
+            edgeConf2.createDefaultEdgeProperty()));
+
     return dag;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/af1884a1/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java 
b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
index baf0e87..e35bc07 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestTezJobs.java
@@ -25,12 +25,15 @@ import static org.junit.Assert.fail;
 
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,6 +45,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.tez.client.TezClient;
@@ -60,6 +65,7 @@ import org.apache.tez.mapreduce.examples.ExampleDriver;
 import org.apache.tez.mapreduce.examples.IntersectDataGen;
 import org.apache.tez.mapreduce.examples.IntersectExample;
 import org.apache.tez.mapreduce.examples.IntersectValidate;
+import org.apache.tez.mapreduce.examples.OrderedWordCount;
 import org.apache.tez.runtime.library.processor.SleepProcessor;
 import 
org.apache.tez.runtime.library.processor.SleepProcessor.SleepProcessorConfig;
 import org.junit.AfterClass;
@@ -256,7 +262,7 @@ public class TestTezJobs {
     assertEquals(1, statuses.length);
     FSDataInputStream inStream = remoteFs.open(statuses[0].getPath());
     BufferedReader reader = new BufferedReader(new 
InputStreamReader(inStream));
-    String line = null;
+    String line;
     while ((line = reader.readLine()) != null) {
       assertTrue(expectedResult.remove(line));
     }
@@ -404,4 +410,151 @@ public class TestTezJobs {
     tezSession.stop();
   }
 
+  private void generateOrderedWordCountInput(Path inputDir) throws IOException 
{
+    Path dataPath1 = new Path(inputDir, "inPath1");
+    Path dataPath2 = new Path(inputDir, "inPath2");
+
+    FSDataOutputStream f1 = null;
+    FSDataOutputStream f2 = null;
+    try {
+      f1 = remoteFs.create(dataPath1);
+      f2 = remoteFs.create(dataPath2);
+
+      final String prefix = "a";
+      for (int i = 1; i <= 10; ++i) {
+        final String word = prefix + "_" + i;
+        for (int j = 10; j >= i; --j) {
+          LOG.info("Writing " + word + " to input files");
+          f1.write(word.getBytes());
+          f1.writeChars("\t");
+          f2.write(word.getBytes());
+          f2.writeChars("\t");
+        }
+      }
+      f1.hsync();
+      f2.hsync();
+    } finally {
+      if (f1 != null) {
+        f1.close();
+      }
+      if (f2 != null) {
+        f2.close();
+      }
+    }
+  }
+
+  private void verifyOrderedWordCountOutput(Path resultFile) throws 
IOException {
+    FSDataInputStream inputStream = remoteFs.open(resultFile);
+    final String prefix = "a";
+    int currentCounter = 10;
+
+    byte[] buffer = new byte[4096];
+    int bytesRead = inputStream.read(buffer, 0, 4096);
+
+    BufferedReader reader =
+        new BufferedReader(new InputStreamReader(new 
ByteArrayInputStream(buffer, 0, bytesRead)));
+
+    String line;
+    while ((line = reader.readLine()) != null) {
+      LOG.info("Line: " + line + ", counter=" + currentCounter);
+      int pos = line.indexOf("\t");
+      String word = line.substring(0, pos-1);
+      Assert.assertEquals(prefix + "_" + currentCounter, word);
+      String val = line.substring(pos+1, line.length());
+      Assert.assertEquals((long)(11 - currentCounter) * 2, 
(long)Long.valueOf(val));
+      currentCounter--;
+    }
+
+    Assert.assertEquals(0, currentCounter);
+  }
+
+  @Test(timeout = 60000)
+  public void testOrderedWordCount() throws Exception {
+    String inputDirStr = "/tmp/owc-input/";
+    Path inputDir = new Path(inputDirStr);
+    Path stagingDirPath = new Path("/tmp/owc-staging-dir");
+    remoteFs.mkdirs(inputDir);
+    remoteFs.mkdirs(stagingDirPath);
+    generateOrderedWordCountInput(inputDir);
+
+    String outputDirStr = "/tmp/owc-output/";
+    Path outputDir = new Path(outputDirStr);
+
+    TezConfiguration tezConf = new TezConfiguration(mrrTezCluster.getConfig());
+    Path simpleLogPath = new Path("/tmp/owc-logging/");
+    remoteFs.mkdirs(simpleLogPath);
+    simpleLogPath = remoteFs.resolvePath(simpleLogPath);
+    tezConf.set(TezConfiguration.TEZ_SIMPLE_HISTORY_LOGGING_DIR, 
simpleLogPath.toString());
+    tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, 
stagingDirPath.toString());
+    TezClient tezSession = null;
+
+    try {
+      tezSession = new TezClient("OrderedWordCountSession", tezConf);
+      tezSession.start();
+      tezSession.waitTillReady();
+
+      Map<String, LocalResource> localResourceMap = new TreeMap<String, 
LocalResource>();
+
+      OrderedWordCount orderedWordCount = new OrderedWordCount();
+      DAG dag = orderedWordCount.createDAG(remoteFs, tezConf, 
localResourceMap, stagingDirPath,
+          1, inputDirStr, outputDirStr, false);
+
+      DAGClient dagClient = tezSession.submitDAG(dag);
+      DAGStatus dagStatus = dagClient.getDAGStatus(null);
+      while (!dagStatus.isCompleted()) {
+        LOG.info("Waiting for job to complete. Sleeping for 500ms." + " 
Current state: "
+            + dagStatus.getState());
+        Thread.sleep(500l);
+        dagStatus = dagClient.getDAGStatus(null);
+      }
+      dagStatus = 
dagClient.getDAGStatus(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
+
+      assertEquals(DAGStatus.State.SUCCEEDED, dagStatus.getState());
+      assertTrue(remoteFs.exists(outputDir));
+      if (tezSession != null) {
+        tezSession.stop();
+        tezSession = null;
+      }
+
+      FileStatus[] fileStatuses = remoteFs.listStatus(outputDir);
+      Path resultFile = null;
+      boolean foundResult = false;
+      boolean foundSuccessFile = false;
+      for (FileStatus fileStatus : fileStatuses) {
+        if (!fileStatus.isFile()) {
+          continue;
+        }
+        if (fileStatus.getPath().getName().equals("_SUCCESS")) {
+          foundSuccessFile = true;
+          continue;
+        }
+        if (fileStatus.getPath().getName().startsWith("part-")) {
+          if (foundResult) {
+            fail("Found 2 part files instead of 1"
+                + ", paths=" + resultFile + "," + fileStatus.getPath());
+          }
+          foundResult = true;
+          resultFile = fileStatus.getPath();
+          LOG.info("Found output at " + resultFile);
+        }
+      }
+      assertTrue(foundResult);
+      assertTrue(resultFile != null);
+      assertTrue(foundSuccessFile);
+      verifyOrderedWordCountOutput(resultFile);
+
+      // check simple history log exists
+      FileStatus[] fileStatuses1 = remoteFs.listStatus(simpleLogPath);
+      Assert.assertEquals(1, fileStatuses1.length);
+      Assert.assertTrue(fileStatuses1[0].getPath().getName().startsWith(
+          SimpleHistoryLoggingService.LOG_FILE_NAME_PREFIX));
+    } finally {
+      if (tezSession != null) {
+        tezSession.stop();
+      }
+    }
+
+  }
+
+
 }

Reply via email to