Author: cdouglas
Date: Fri Apr 25 13:06:42 2008
New Revision: 651693
URL: http://svn.apache.org/viewvc?rev=651693&view=rev
Log:
HADOOP-3295. Allow TextOutputFormat to use configurable spearators.
Contributed by Zheng Shao.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=651693&r1=651692&r2=651693&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Apr 25 13:06:42 2008
@@ -38,6 +38,9 @@
server slowdown. Clients retry connection for up to 15 minutes
when socket connection times out. (hairong)
+ HADOOP-3295. Allow TextOutputFormat to use configurable spearators.
+ (Zheng Shao via cdouglas).
+
OPTIMIZATIONS
HADOOP-3274. The default constructor of BytesWritable creates empty
Modified:
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java?rev=651693&r1=651692&r2=651693&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
(original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TextOutputFormat.java
Fri Apr 25 13:06:42 2008
@@ -38,23 +38,31 @@
protected static class LineRecordWriter<K, V>
implements RecordWriter<K, V> {
private static final String utf8 = "UTF-8";
- private static final byte[] tab;
private static final byte[] newline;
static {
try {
- tab = "\t".getBytes(utf8);
newline = "\n".getBytes(utf8);
} catch (UnsupportedEncodingException uee) {
throw new IllegalArgumentException("can't find " + utf8 + " encoding");
}
}
-
+
private DataOutputStream out;
-
- public LineRecordWriter(DataOutputStream out) {
+ private final byte[] keyValueSeparator;
+
+ public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
this.out = out;
+ try {
+ this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
+ } catch (UnsupportedEncodingException uee) {
+ throw new IllegalArgumentException("can't find " + utf8 + " encoding");
+ }
}
-
+
+ public LineRecordWriter(DataOutputStream out) {
+ this(out, "\t");
+ }
+
/**
* Write the object to the byte stream, handling Text as a special
* case.
@@ -82,7 +90,7 @@
writeObject(key);
}
if (!(nullKey || nullValue)) {
- out.write(tab);
+ out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
@@ -94,13 +102,14 @@
out.close();
}
}
-
+
public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
JobConf job,
String name,
Progressable progress)
throws IOException {
+ String keyValueSeparator = job.get("mapred.textoutputformat.separator",
"\t");
Path dir = getWorkOutputPath(job);
FileSystem fs = dir.getFileSystem(job);
if (!fs.exists(dir)) {
@@ -109,9 +118,9 @@
boolean isCompressed = getCompressOutput(job);
if (!isCompressed) {
FSDataOutputStream fileOut = fs.create(new Path(dir, name), progress);
- return new LineRecordWriter<K, V>(fileOut);
+ return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
} else {
- Class<? extends CompressionCodec> codecClass =
+ Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
// create the named codec
CompressionCodec codec = (CompressionCodec)
@@ -120,8 +129,9 @@
Path filename = new Path(dir, name + codec.getDefaultExtension());
FSDataOutputStream fileOut = fs.create(filename, progress);
return new LineRecordWriter<K, V>(new DataOutputStream
- (codec.createOutputStream(fileOut)));
+ (codec.createOutputStream(fileOut)),
+ keyValueSeparator);
}
- }
+ }
}
Modified:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java?rev=651693&r1=651692&r2=651693&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
(original)
+++
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java
Fri Apr 25 13:06:42 2008
@@ -36,8 +36,8 @@
}
}
- private static Path workDir =
- new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+ private static Path workDir =
+ new Path(new Path(System.getProperty("test.build.data", "."), "data"),
"TestTextOutputFormat");
@SuppressWarnings("unchecked")
@@ -49,7 +49,7 @@
fail("Failed to create output directory");
}
String file = "test.txt";
-
+
// A reporter that does nothing
Reporter reporter = Reporter.NULL;
@@ -76,7 +76,7 @@
} finally {
theRecordWriter.close(reporter);
}
- File expectedFile = new File(new Path(workDir, file).toString());
+ File expectedFile = new File(new Path(workDir, file).toString());
StringBuffer expectedOutput = new StringBuffer();
expectedOutput.append(key1).append('\t').append(val1).append("\n");
expectedOutput.append(val1).append("\n");
@@ -86,7 +86,58 @@
expectedOutput.append(key2).append('\t').append(val2).append("\n");
String output = UtilsForTests.slurp(expectedFile);
assertEquals(output, expectedOutput.toString());
-
+
+ }
+
+ @SuppressWarnings("unchecked")
+ public void testFormatWithCustomSeparator() throws Exception {
+ JobConf job = new JobConf();
+ String separator = "\u0001";
+ job.set("mapred.textoutputformat.separator", separator);
+ FileOutputFormat.setWorkOutputPath(job, workDir);
+ FileSystem fs = workDir.getFileSystem(job);
+ if (!fs.mkdirs(workDir)) {
+ fail("Failed to create output directory");
+ }
+ String file = "test.txt";
+
+ // A reporter that does nothing
+ Reporter reporter = Reporter.NULL;
+
+ TextOutputFormat theOutputFormat = new TextOutputFormat();
+ RecordWriter theRecordWriter =
+ theOutputFormat.getRecordWriter(localFs, job, file, reporter);
+
+ Text key1 = new Text("key1");
+ Text key2 = new Text("key2");
+ Text val1 = new Text("val1");
+ Text val2 = new Text("val2");
+ NullWritable nullWritable = NullWritable.get();
+
+ try {
+ theRecordWriter.write(key1, val1);
+ theRecordWriter.write(null, nullWritable);
+ theRecordWriter.write(null, val1);
+ theRecordWriter.write(nullWritable, val2);
+ theRecordWriter.write(key2, nullWritable);
+ theRecordWriter.write(key1, null);
+ theRecordWriter.write(null, null);
+ theRecordWriter.write(key2, val2);
+
+ } finally {
+ theRecordWriter.close(reporter);
+ }
+ File expectedFile = new File(new Path(workDir, file).toString());
+ StringBuffer expectedOutput = new StringBuffer();
+ expectedOutput.append(key1).append(separator).append(val1).append("\n");
+ expectedOutput.append(val1).append("\n");
+ expectedOutput.append(val2).append("\n");
+ expectedOutput.append(key2).append("\n");
+ expectedOutput.append(key1).append("\n");
+ expectedOutput.append(key2).append(separator).append(val2).append("\n");
+ String output = UtilsForTests.slurp(expectedFile);
+ assertEquals(output, expectedOutput.toString());
+
}
public static void main(String[] args) throws Exception {