Author: cutting
Date: Fri Jul 1 16:23:39 2011
New Revision: 1141975
URL: http://svn.apache.org/viewvc?rev=1141975&view=rev
Log:
AVRO-847. Java: Add a unit test for Java MapReduce tether. Contributed by
Jeremy Lewi.
Added:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/ (props changed)
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1141975&r1=1141974&r2=1141975&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Fri Jul 1 16:23:39 2011
@@ -17,6 +17,8 @@ Avro 1.6.0 (unreleased)
AVRO-841. Java: Implement insertion in GenericData.Array.
(Nick Palmer via cutting)
+ AVRO-847. Java: Add a unit test for Java MapReduce tether. (Jeremy Lewi)
+
BUG FIXES
AVRO-845. setup.py uses Python2.7+ specific code
Propchange: avro/trunk/lang/java/mapred/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Fri Jul 1 16:23:39 2011
@@ -1,4 +1,5 @@
target
+userlogs
.classpath
.settings
.project
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java?rev=1141975&r1=1141974&r2=1141975&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherJob.java
Fri Jul 1 16:23:39 2011
@@ -35,6 +35,10 @@ import org.apache.hadoop.mapred.RunningJ
@SuppressWarnings("deprecation")
public class TetherJob extends Configured {
+ public static final String TETHER_EXEC="avro.tether.executable";
+ public static final String TETHER_EXEC_ARGS="avro.tether.executable_args";
+ public static final String
TETHER_EXEC_CACHED="avro.tether.executable_cached";
+
/** Get the URI of the application's executable. */
public static URI getExecutable(JobConf job) {
try {
@@ -46,7 +50,23 @@ public class TetherJob extends Configure
/** Set the URI for the application's executable. Normally this in HDFS. */
public static void setExecutable(JobConf job, URI executable) {
- job.set("avro.tether.executable", executable.toString());
+ setExecutable(job,executable,"",false);
+ }
+
+ /**
+ * Set the URI for the application's executable (i.e the program to run in a
subprocess
+ * and provides the mapper/reducer).
+ * @param job - Job
+ * @param executable - The URI of the executable
+ * @param argstr - A string of additional arguments
+ * @param cached - If true, the executable URI is cached using
DistributedCache
+ * - if false its not cached. I.e if the file is already
stored on each local file system
+ * or if its on a NFS share
+ */
+ public static void setExecutable(JobConf job, URI executable,String
argstr,boolean cached) {
+ job.set(TETHER_EXEC, executable.toString());
+ job.set(TETHER_EXEC_ARGS, argstr);
+ job.set(TETHER_EXEC_CACHED, (new Boolean(cached)).toString());
}
/** Submit a job to the map/reduce cluster. All of the necessary
@@ -76,6 +96,9 @@ public class TetherJob extends Configure
job.setOutputKeyComparatorClass(TetherKeyComparator.class);
job.setMapOutputValueClass(NullWritable.class);
+ // set the map output key class to TetherData
+ job.setMapOutputKeyClass(TetherData.class);
+
// add TetherKeySerialization to io.serializations
Collection<String> serializations =
job.getStringCollection("io.serializations");
@@ -84,8 +107,11 @@ public class TetherJob extends Configure
job.setStrings("io.serializations",
serializations.toArray(new String[0]));
}
-
- DistributedCache.addCacheFile(getExecutable(job), job);
+
+ // determine whether the executable should be added to the cache.
+ if (job.getBoolean(TETHER_EXEC_CACHED,false)){
+ DistributedCache.addCacheFile(getExecutable(job), job);
+ }
}
}
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java?rev=1141975&r1=1141974&r2=1141975&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetherKeySerialization.java
Fri Jul 1 16:23:39 2011
@@ -75,7 +75,7 @@ class TetherKeySerialization
public void open(OutputStream out) {
this.out = out;
- this.encoder = EncoderFactory.get().binaryEncoder(out, null);
+ this.encoder = EncoderFactory.get().directBinaryEncoder(out, encoder);
}
public void serialize(TetherData datum) throws IOException {
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java?rev=1141975&r1=1141974&r2=1141975&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/tether/TetheredProcess.java
Fri Jul 1 16:23:39 2011
@@ -38,14 +38,19 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileUtil;
import org.apache.avro.ipc.Transceiver;
-import org.apache.avro.ipc.SocketTransceiver;
import org.apache.avro.ipc.Server;
-import org.apache.avro.ipc.SocketServer;
+import org.apache.avro.ipc.SaslSocketServer;
+import org.apache.avro.ipc.SaslSocketTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificResponder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
class TetheredProcess {
+ static final Logger LOG = LoggerFactory.getLogger(TetherMapRunner.class);
+
private JobConf job;
TetherOutputService outputService;
@@ -60,17 +65,28 @@ class TetheredProcess {
try {
// start server
this.outputService = new TetherOutputService(collector, reporter);
- this.outputServer = new SocketServer
+ this.outputServer = new SaslSocketServer
(new SpecificResponder(OutputProtocol.class, outputService),
new InetSocketAddress(0));
outputServer.start();
// start sub-process, connecting back to server
this.subprocess = startSubprocess(job);
-
- // open client, connecting to sub-process
- this.clientTransceiver =
- new SocketTransceiver(new
InetSocketAddress(outputService.inputPort()));
+
+ // check if the process has exited -- is there a better way to do this?
+ boolean hasexited = false;
+ try {
+ // exitValue throws an exception if process hasn't exited
+ this.subprocess.exitValue();
+ hasexited = true;
+ } catch (IllegalThreadStateException e) {
+ }
+ if (hasexited) {
+ LOG.error("Could not start subprocess");
+ throw new RuntimeException("Could not start subprocess");
+ }
+ this.clientTransceiver
+ = new SaslSocketTransceiver(new
InetSocketAddress(outputService.inputPort()));
this.inputClient =
SpecificRequestor.getClient(InputProtocol.class, clientTransceiver);
@@ -96,15 +112,39 @@ class TetheredProcess {
throws IOException, InterruptedException {
// get the executable command
List<String> command = new ArrayList<String>();
- Path[] localFiles = DistributedCache.getLocalCacheFiles(job);
- if (localFiles == null) { // until MAPREDUCE-476
- URI[] files = DistributedCache.getCacheFiles(job);
- localFiles = new Path[] { new Path(files[0].toString()) };
+
+ String executable="";
+ if (job.getBoolean(TetherJob.TETHER_EXEC_CACHED,false)){
+ //we want to use the cached executable
+ Path[] localFiles = DistributedCache.getLocalCacheFiles(job);
+ if (localFiles == null) { // until MAPREDUCE-476
+ URI[] files = DistributedCache.getCacheFiles(job);
+ localFiles = new Path[] { new Path(files[0].toString()) };
+ }
+ executable=localFiles[0].toString();
+ FileUtil.chmod(executable.toString(), "a+x");
+ }
+ else {
+ executable=job.get(TetherJob.TETHER_EXEC);
}
- String executable = localFiles[0].toString();
- FileUtil.chmod(executable, "a+x");
+
command.add(executable);
+ // Add the executable arguments. We assume the arguments are separated by
+ // spaces so we split the argument string based on spaces and add each
+ // token to command We need to do it this way because
+ // TaskLog.captureOutAndError will put quote marks around each argument so
+ // if we pass a single string containing all arguments we get quoted
+ // incorrectly
+ String args=job.get(TetherJob.TETHER_EXEC_ARGS);
+ String[] aparams=args.split(" ");
+ for (int i=0;i<aparams.length; i++){
+ aparams[i]=aparams[i].trim();
+ if (aparams[i].length()>0){
+ command.add(aparams[i]);
+ }
+ }
+
if (System.getProperty("hadoop.log.dir") == null
&& System.getenv("HADOOP_LOG_DIR") != null)
System.setProperty("hadoop.log.dir", System.getenv("HADOOP_LOG_DIR"));
Modified:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java?rev=1141975&r1=1141974&r2=1141975&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
(original)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/WordCountUtil.java
Fri Jul 1 16:23:39 2011
@@ -50,15 +50,15 @@ import org.apache.avro.specific.Specific
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.DataFileStream;
-class WordCountUtil {
+public class WordCountUtil {
private static final File DIR
= new File(System.getProperty("test.dir", ".") + "/mapred");
- private static final File LINES_FILE
+ public static final File LINES_FILE
= new File(new File(DIR, "in"), "lines.avro");
private static final File LINES_TEXT_FILE
= new File(new File(DIR, "in"), "lines.txt");
- private static final File COUNTS_FILE
+ public static final File COUNTS_FILE
= new File(new File(DIR, "out"), "part-00000.avro");
private static final File SORTED_FILE
= new File(new File(DIR, "out"), "part-00000.avro");
Added:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java?rev=1141975&view=auto
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java
(added)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/TestWordCountTether.java
Fri Jul 1 16:23:39 2011
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapred.tether;
+
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.StringTokenizer;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import junit.framework.Assert;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.Protocol;
+import org.junit.Test;
+
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.WordCountUtil;
+import org.apache.avro.mapred.Pair;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.DataFileStream;
+
+public class TestWordCountTether {
+
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testJob() throws Exception {
+
+ System.out.println(System.getProperty("java.class.path"));
+ JobConf job = new JobConf();
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path outputPath = new Path(dir + "/out");
+
+ outputPath.getFileSystem(job).delete(outputPath);
+
+ // create the input file
+ WordCountUtil.writeLinesFile();
+
+ java.net.URI exec= new java.net.URI("java");
+
+ //input path
+ String in=dir+"/in";
+
+ //create a string of the arguments
+ String execargs="-classpath " + System.getProperty("java.class.path");
+ execargs+= " org.apache.avro.mapred.tether.WordCountTask";
+
+ FileInputFormat.addInputPaths(job, in);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ TetherJob.setExecutable(job, exec,execargs,false);
+
+ Schema outscheme= new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema();
+ AvroJob.setInputSchema(job, Schema.create(Schema.Type.STRING));
+ job.set(AvroJob.OUTPUT_SCHEMA, outscheme.toString());
+
+ TetherJob.runJob(job);
+
+ // validate the output
+ DatumReader<Pair<Utf8,Long>> reader
+ = new SpecificDatumReader<Pair<Utf8,Long>>();
+ InputStream cin = new BufferedInputStream(new
FileInputStream(WordCountUtil.COUNTS_FILE));
+ DataFileStream<Pair<Utf8,Long>> counts
+ = new DataFileStream<Pair<Utf8,Long>>(cin,reader);
+ int numWords = 0;
+ for (Pair<Utf8,Long> wc : counts) {
+ assertEquals(wc.key().toString(),
+ WordCountUtil.COUNTS.get(wc.key().toString()), wc.value());
+ numWords++;
+ }
+
+ cin.close();
+ assertEquals(WordCountUtil.COUNTS.size(), numWords);
+
+ }
+
+
+}
Modified:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java?rev=1141975&r1=1141974&r2=1141975&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
(original)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/tether/WordCountTask.java
Fri Jul 1 16:23:39 2011
@@ -50,7 +50,7 @@ public class WordCountTask
sum = 0;
}
- public static void main(String... args) throws Exception {
+ public static void main(String[] args) throws Exception {
new TetherTaskRunner(new WordCountTask()).join();
}