Author: omalley
Date: Mon Jun 30 09:22:50 2008
New Revision: 672807
URL: http://svn.apache.org/viewvc?rev=672807&view=rev
Log:
HADOOP-3341. Allow streaming jobs to specify the field separator for map
and reduce input and output. The new configuration values are:
stream.map.input.field.separator
stream.map.output.field.separator
stream.reduce.input.field.separator
stream.reduce.output.field.separator
All of them default to "\t". Contributed by Zheng Shao.
Added:
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/build.xml
hadoop/core/trunk/src/contrib/build.xml
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=672807&r1=672806&r2=672807&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jun 30 09:22:50 2008
@@ -10,6 +10,14 @@
NEW FEATURES
+ HADOOP-3341. Allow streaming jobs to specify the field separator for map
+ and reduce input and output. The new configuration values are:
+ stream.map.input.field.separator
+ stream.map.output.field.separator
+ stream.reduce.input.field.separator
+ stream.reduce.output.field.separator
+ All of them default to "\t". (Zheng Shao via omalley)
+
IMPROVEMENTS
HADOOP-3577. Tools to inject blocks into name node and simulated
data nodes for testing. (Sanjay Radia via hairong)
Modified: hadoop/core/trunk/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/build.xml?rev=672807&r1=672806&r2=672807&view=diff
==============================================================================
--- hadoop/core/trunk/build.xml (original)
+++ hadoop/core/trunk/build.xml Mon Jun 30 09:22:50 2008
@@ -642,7 +642,7 @@
<fail if="tests.failed">Tests failed!</fail>
</target>
- <target name="test-contrib" depends="compile-core, compile-core-test"
description="Run contrib unit tests">
+ <target name="test-contrib" depends="compile, compile-core-test"
description="Run contrib unit tests">
<subant target="test">
<property name="version" value="${version}"/>
<fileset file="${contrib.dir}/build.xml"/>
Modified: hadoop/core/trunk/src/contrib/build.xml
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/build.xml?rev=672807&r1=672806&r2=672807&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/build.xml (original)
+++ hadoop/core/trunk/src/contrib/build.xml Mon Jun 30 09:22:50 2008
@@ -46,7 +46,7 @@
<!-- ====================================================== -->
<target name="test">
<subant target="test">
- <fileset dir="." includes="*/build.xml"/>
+ <fileset dir="." includes="streaming/build.xml"/>
</subant>
</target>
Modified:
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java?rev=672807&r1=672806&r2=672807&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
(original)
+++
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapRed.java
Mon Jun 30 09:22:50 2008
@@ -53,8 +53,8 @@
*/
abstract String getPipeCommand(JobConf job);
- abstract char getFieldSeparator();
-
+ abstract byte[] getFieldSeparator();
+
abstract int getNumOfKeyFields();
abstract boolean getDoPipe();
@@ -120,13 +120,6 @@
job_ = job;
fs_ = FileSystem.get(job_);
- String mapOutputFieldSeparator =
job_.get("stream.map.output.field.separator", "\t");
- String reduceOutputFieldSeparator =
job_.get("stream.reduce.output.field.separator", "\t");
- this.mapOutputFieldSeparator = mapOutputFieldSeparator.charAt(0);
- this.reduceOutFieldSeparator = reduceOutputFieldSeparator.charAt(0);
- this.numOfMapOutputKeyFields =
job_.getInt("stream.num.map.output.key.fields", 1);
- this.numOfReduceOutputKeyFields =
job_.getInt("stream.num.reduce.output.key.fields", 1);
-
nonZeroExitIsFailure_ =
job_.getBoolean("stream.non.zero.exit.is.failure", true);
doPipe_ = getDoPipe();
@@ -317,7 +310,7 @@
}
/**
- * Split a line into key and value. Assume the delimitor is a tab.
+ * Split a line into key and value.
* @param line: a byte array of line containing UTF-8 bytes
* @param key: key of a record
* @param val: value of a record
@@ -325,14 +318,21 @@
*/
void splitKeyVal(byte[] line, int length, Text key, Text val)
throws IOException {
- int pos = UTF8ByteArrayUtils.findNthByte(line, 0, length,
- (byte)this.getFieldSeparator(), this.getNumOfKeyFields());
+ int numKeyFields = getNumOfKeyFields();
+ byte[] separator = getFieldSeparator();
+
+ // Need to find numKeyFields separators
+ int pos = UTF8ByteArrayUtils.findBytes(line, 0, line.length, separator);
+ for(int k=1; k<numKeyFields && pos!=-1; k++) {
+ pos = UTF8ByteArrayUtils.findBytes(line, pos + separator.length,
+ line.length, separator);
+ }
try {
if (pos == -1) {
key.set(line, 0, length);
val.set("");
} else {
- UTF8ByteArrayUtils.splitKeyVal(line, 0, length, key, val, pos);
+ UTF8ByteArrayUtils.splitKeyVal(line, 0, length, key, val, pos,
separator.length);
}
} catch (CharacterCodingException e) {
LOG.warn(StringUtils.stringifyException(e));
@@ -647,10 +647,4 @@
String LOGNAME;
PrintStream log_;
- protected char mapOutputFieldSeparator = '\t';
- protected char reduceOutFieldSeparator = '\t';
- protected int numOfMapOutputKeyFields = 1;
- protected int numOfMapOutputPartitionFields = 1;
- protected int numOfReduceOutputKeyFields = 1;
-
}
Modified:
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java?rev=672807&r1=672806&r2=672807&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
(original)
+++
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeMapper.java
Mon Jun 30 09:22:50 2008
@@ -34,6 +34,10 @@
public class PipeMapper extends PipeMapRed implements Mapper {
private boolean ignoreKey = false;
+
+ private byte[] mapOutputFieldSeparator;
+ private byte[] mapInputFieldSeparator;
+ private int numOfMapOutputKeyFields = 1;
String getPipeCommand(JobConf job) {
String str = job.get("stream.map.streamprocessor");
@@ -56,7 +60,15 @@
public void configure(JobConf job) {
super.configure(job);
String inputFormatClassName = job.getClass("mapred.input.format.class",
TextInputFormat.class).getCanonicalName();
- this.ignoreKey =
inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
+ ignoreKey =
inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
+
+ try {
+ mapOutputFieldSeparator = job.get("stream.map.output.field.separator",
"\t").getBytes("UTF-8");
+ mapInputFieldSeparator = job.get("stream.map.input.field.separator",
"\t").getBytes("UTF-8");
+ numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields",
1);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("The current system does not support UTF-8
encoding!", e);
+ }
}
// Do NOT declare default constructor
@@ -85,7 +97,7 @@
if (numExceptions_ == 0) {
if (!this.ignoreKey) {
write(key);
- clientOut_.write('\t');
+ clientOut_.write(getInputSeparator());
}
write(value);
clientOut_.write('\n');
@@ -112,14 +124,18 @@
mapRedFinished();
}
+ byte[] getInputSeparator() {
+ return mapInputFieldSeparator;
+ }
+
@Override
- char getFieldSeparator() {
- return super.mapOutputFieldSeparator;
+ byte[] getFieldSeparator() {
+ return mapOutputFieldSeparator;
}
@Override
int getNumOfKeyFields() {
- return super.numOfMapOutputKeyFields;
+ return numOfMapOutputKeyFields;
}
}
Modified:
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java?rev=672807&r1=672806&r2=672807&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
(original)
+++
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/PipeReducer.java
Mon Jun 30 09:22:50 2008
@@ -36,6 +36,10 @@
*/
public class PipeReducer extends PipeMapRed implements Reducer {
+ private byte[] reduceOutFieldSeparator;
+ private byte[] reduceInputFieldSeparator;
+ private int numOfReduceOutputKeyFields = 1;
+
String getPipeCommand(JobConf job) {
String str = job.get("stream.reduce.streamprocessor");
if (str == null) {
@@ -55,6 +59,18 @@
return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
}
+ public void configure(JobConf job) {
+ super.configure(job);
+
+ try {
+ reduceOutFieldSeparator =
job_.get("stream.reduce.output.field.separator", "\t").getBytes("UTF-8");
+ reduceInputFieldSeparator =
job_.get("stream.reduce.input.field.separator", "\t").getBytes("UTF-8");
+ this.numOfReduceOutputKeyFields =
job_.getInt("stream.num.reduce.output.key.fields", 1);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException("The current system does not support UTF-8
encoding!", e);
+ }
+ }
+
public void reduce(Object key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException {
@@ -75,7 +91,7 @@
outerrThreadsThrowable));
}
write(key);
- clientOut_.write('\t');
+ clientOut_.write(getInputSeparator());
write(val);
clientOut_.write('\n');
} else {
@@ -109,14 +125,18 @@
mapRedFinished();
}
- @Override
- char getFieldSeparator() {
- return super.reduceOutFieldSeparator;
+ byte[] getInputSeparator() {
+ return reduceInputFieldSeparator;
}
@Override
+ byte[] getFieldSeparator() {
+ return reduceOutFieldSeparator;
+ }
+
+ @Override
int getNumOfKeyFields() {
- return super.numOfReduceOutputKeyFields;
+ return numOfReduceOutputKeyFields;
}
}
Modified:
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java?rev=672807&r1=672806&r2=672807&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
(original)
+++
hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/UTF8ByteArrayUtils.java
Mon Jun 30 09:22:50 2008
@@ -60,7 +60,32 @@
}
return -1;
}
-
+
+ /**
+ * Find the first occurrence of the given bytes b in a UTF-8 encoded string
+ * @param utf a byte array containing a UTF-8 encoded string
+ * @param start starting offset
+ * @param end ending position
+ * @param b the bytes to find
+ * @return position that first byte occures otherwise -1
+ */
+ public static int findBytes(byte [] utf, int start, int end, byte[] b) {
+ int matchEnd = end - b.length;
+ for(int i=start; i<=matchEnd; i++) {
+ boolean matched = true;
+ for(int j=0; j<b.length; j++) {
+ if (utf[i+j] != b[j]) {
+ matched = false;
+ break;
+ }
+ }
+ if (matched) {
+ return i;
+ }
+ }
+ return -1;
+ }
+
/**
* Find the nth occurrence of the given byte b in a UTF-8 encoded string
* @param utf a byte array containing a UTF-8 encoded string
@@ -112,23 +137,57 @@
* @param key contains key upon the method is returned
* @param val contains value upon the method is returned
* @param splitPos the split pos
+ * @param separatorLength the length of the separator between key and value
* @throws IOException
*/
public static void splitKeyVal(byte[] utf, int start, int length,
- Text key, Text val, int splitPos) throws
IOException {
+ Text key, Text val, int splitPos,
+ int separatorLength) throws IOException {
if (splitPos<start || splitPos >= (start+length))
throw new IllegalArgumentException("splitPos must be in the range " +
"[" + start + ", " + (start+length) +
"]: " + splitPos);
int keyLen = (splitPos-start);
byte [] keyBytes = new byte[keyLen];
System.arraycopy(utf, start, keyBytes, 0, keyLen);
- int valLen = (start+length)-splitPos-1;
+ int valLen = (start+length)-splitPos-separatorLength;
byte [] valBytes = new byte[valLen];
- System.arraycopy(utf, splitPos+1, valBytes, 0, valLen);
+ System.arraycopy(utf, splitPos+separatorLength, valBytes, 0, valLen);
key.set(keyBytes);
val.set(valBytes);
}
-
+
+ /**
+ * split a UTF-8 byte array into key and value
+ * assuming that the delimilator is at splitpos.
+ * @param utf utf-8 encoded string
+ * @param start starting offset
+ * @param length no. of bytes
+ * @param key contains key upon the method is returned
+ * @param val contains value upon the method is returned
+ * @param splitPos the split pos
+ * @throws IOException
+ */
+ public static void splitKeyVal(byte[] utf, int start, int length,
+ Text key, Text val, int splitPos) throws
IOException {
+ splitKeyVal(utf, start, length, key, val, splitPos, 1);
+ }
+
+
+ /**
+ * split a UTF-8 byte array into key and value
+ * assuming that the delimilator is at splitpos.
+ * @param utf utf-8 encoded string
+ * @param key contains key upon the method is returned
+ * @param val contains value upon the method is returned
+ * @param splitPos the split pos
+ * @param separatorLength the length of the separator between key and value
+ * @throws IOException
+ */
+ public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos,
+ int separatorLength)
+ throws IOException {
+ splitKeyVal(utf, 0, utf.length, key, val, splitPos, separatorLength);
+ }
/**
* split a UTF-8 byte array into key and value
@@ -141,9 +200,9 @@
*/
public static void splitKeyVal(byte[] utf, Text key, Text val, int splitPos)
throws IOException {
- splitKeyVal(utf, 0, utf.length, key, val, splitPos);
+ splitKeyVal(utf, 0, utf.length, key, val, splitPos, 1);
}
-
+
/**
* Read a utf8 encoded line from a data input stream.
* @param lineReader LineReader to read the line from.
Added:
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java?rev=672807&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java
(added)
+++
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TestStreamingSeparator.java
Mon Jun 30 09:22:50 2008
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import junit.framework.TestCase;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * This class tests hadoopStreaming with customized separator in MapReduce
local mode.
+ */
+public class TestStreamingSeparator extends TestCase
+{
+
+ // "map" command: grep -E (red|green|blue)
+ // reduce command: uniq
+ protected File INPUT_FILE = new File("TestStreamingSeparator.input.txt");
+ protected File OUTPUT_DIR = new File("TestStreamingSeparator.out");
+ protected String input =
"roses1are.red\nviolets1are.blue\nbunnies1are.pink\n";
+ // key.value.separator.in.input.line reads 1 as separator
+ // stream.map.input.field.separator uses 2 as separator
+ // map behaves like "/usr/bin/tr 2 3"; (translate 2 to 3)
+ protected String map = StreamUtil.makeJavaCommand(TrApp.class, new
String[]{"2", "3"});
+ // stream.map.output.field.separator recognize 3 as separator
+ // stream.reduce.input.field.separator recognize 3 as separator
+ // reduce behaves like "/usr/bin/tr 3 4"; (translate 3 to 4)
+ protected String reduce = StreamUtil.makeJavaCommand(TrAppReduce.class, new
String[]{"3", "4"});
+ // stream.reduce.output.field.separator recognize 4 as separator
+ // mapred.textoutputformat.separator outputs 5 as separator
+ protected String outputExpect =
"bunnies5are.pink\nroses5are.red\nviolets5are.blue\n";
+
+ private StreamJob job;
+
+ public TestStreamingSeparator() throws IOException
+ {
+ UtilTest utilTest = new UtilTest(getClass().getName());
+ utilTest.checkUserDir();
+ utilTest.redirectIfAntJunit();
+ }
+
+ protected void createInput() throws IOException
+ {
+ DataOutputStream out = new DataOutputStream(
+ new
FileOutputStream(INPUT_FILE.getAbsoluteFile()));
+ out.write(input.getBytes("UTF-8"));
+ out.close();
+ }
+
+ protected String[] genArgs() {
+ return new String[] {
+ "-input", INPUT_FILE.getAbsolutePath(),
+ "-output", OUTPUT_DIR.getAbsolutePath(),
+ "-mapper", map,
+ "-reducer", reduce,
+ //"-verbose",
+ //"-jobconf", "stream.debug=set"
+ "-jobconf", "keep.failed.task.files=true",
+ "-jobconf",
"stream.tmpdir="+System.getProperty("test.build.data","/tmp"),
+ "-inputformat", "KeyValueTextInputFormat",
+ "-jobconf", "key.value.separator.in.input.line=1",
+ "-jobconf", "stream.map.input.field.separator=2",
+ "-jobconf", "stream.map.output.field.separator=3",
+ "-jobconf", "stream.reduce.input.field.separator=3",
+ "-jobconf", "stream.reduce.output.field.separator=4",
+ "-jobconf", "mapred.textoutputformat.separator=5",
+ };
+ }
+
+ public void testCommandLine()
+ {
+ try {
+ try {
+ OUTPUT_DIR.getAbsoluteFile().delete();
+ } catch (Exception e) {
+ }
+
+ createInput();
+ boolean mayExit = false;
+
+ // During tests, the default Configuration will use a local mapred
+ // So don't specify -config or -cluster
+ job = new StreamJob(genArgs(), mayExit);
+ job.go();
+ File outFile = new File(OUTPUT_DIR, "part-00000").getAbsoluteFile();
+ String output = StreamUtil.slurp(outFile);
+ outFile.delete();
+ System.err.println("outEx1=" + outputExpect);
+ System.err.println(" out1=" + output);
+ assertEquals(outputExpect, output);
+ } catch(Exception e) {
+ failTrace(e);
+ } finally {
+ File outFileCRC = new File(OUTPUT_DIR,
".part-00000.crc").getAbsoluteFile();
+ INPUT_FILE.delete();
+ outFileCRC.delete();
+ OUTPUT_DIR.getAbsoluteFile().delete();
+ }
+ }
+
+ private void failTrace(Exception e)
+ {
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ fail(sw.toString());
+ }
+
+ public static void main(String[]args) throws Exception
+ {
+ new TestStreamingSeparator().testCommandLine();
+ }
+
+}
Modified:
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java?rev=672807&r1=672806&r2=672807&view=diff
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
(original)
+++
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrApp.java
Mon Jun 30 09:22:50 2008
@@ -41,7 +41,6 @@
// test that some JobConf properties are exposed as expected
// Note the dots translated to underscore:
// property names have been escaped in PipeMapRed.safeEnvVarName()
- expect("mapred_input_format_class",
"org.apache.hadoop.mapred.TextInputFormat");
expect("mapred_job_tracker", "local");
//expect("mapred_local_dir", "build/test/mapred/local");
expectDefined("mapred_local_dir");
Added:
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java?rev=672807&view=auto
==============================================================================
---
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
(added)
+++
hadoop/core/trunk/src/contrib/streaming/src/test/org/apache/hadoop/streaming/TrAppReduce.java
Mon Jun 30 09:22:50 2008
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming;
+
+import java.io.*;
+
+import org.apache.hadoop.streaming.Environment;
+
+/** A minimal Java implementation of /usr/bin/tr.
+ Used to test the usage of external applications without adding
+ platform-specific dependencies.
+ */
+public class TrAppReduce
+{
+
+ public TrAppReduce(char find, char replace)
+ {
+ this.find = find;
+ this.replace = replace;
+ }
+
+ void testParentJobConfToEnvVars() throws IOException
+ {
+ env = new Environment();
+ // test that some JobConf properties are exposed as expected
+ // Note the dots translated to underscore:
+ // property names have been escaped in PipeMapRed.safeEnvVarName()
+ expect("mapred_job_tracker", "local");
+ //expect("mapred_local_dir", "build/test/mapred/local");
+ expectDefined("mapred_local_dir");
+ expect("mapred_output_format_class",
"org.apache.hadoop.mapred.TextOutputFormat");
+ expect("mapred_output_key_class", "org.apache.hadoop.io.Text");
+ expect("mapred_output_value_class", "org.apache.hadoop.io.Text");
+
+ expect("mapred_task_is_map", "false");
+ expectDefined("mapred_task_id");
+
+ expectDefined("io_sort_factor");
+
+ // the FileSplit context properties are not available in local hadoop..
+ // so can't check them in this test.
+
+ }
+
+ // this runs in a subprocess; won't use JUnit's assertTrue()
+ void expect(String evName, String evVal) throws IOException
+ {
+ String got = env.getProperty(evName);
+ if (!evVal.equals(got)) {
+ String msg = "FAIL evName=" + evName + " got=" + got + " expect=" +
evVal;
+ throw new IOException(msg);
+ }
+ }
+
+ void expectDefined(String evName) throws IOException
+ {
+ String got = env.getProperty(evName);
+ if (got == null) {
+ String msg = "FAIL evName=" + evName + " is undefined. Expect defined.";
+ throw new IOException(msg);
+ }
+ }
+
+ public void go() throws IOException
+ {
+ testParentJobConfToEnvVars();
+ BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
+ String line;
+
+ while ((line = in.readLine()) != null) {
+ String out = line.replace(find, replace);
+ System.out.println(out);
+ }
+ }
+
+ public static void main(String[] args) throws IOException
+ {
+ args[0] = CUnescape(args[0]);
+ args[1] = CUnescape(args[1]);
+ TrAppReduce app = new TrAppReduce(args[0].charAt(0), args[1].charAt(0));
+ app.go();
+ }
+
+ public static String CUnescape(String s)
+ {
+ if (s.equals("\\n")) {
+ return "\n";
+ } else {
+ return s;
+ }
+ }
+ char find;
+ char replace;
+ Environment env;
+}