Author: shv
Date: Mon May 18 18:15:29 2009
New Revision: 776032
URL: http://svn.apache.org/viewvc?rev=776032&view=rev
Log:
HADOOP-5858. Eliminate UTF8 and fix warnings in test/hdfs-with-mr package.
Contributed by Konstantin Shvachko.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestFileSystem.java
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestHarFileSystem.java
Modified: hadoop/core/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=776032&r1=776031&r2=776032&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon May 18 18:15:29 2009
@@ -364,6 +364,9 @@
HADOOP-5369. Small tweaks to reduce MapFile index size. (Ben Maurer
via sharad)
+ HADOOP-5858. Eliminate UTF8 and fix warnings in test/hdfs-with-mr package.
+ (shv)
+
OPTIMIZATIONS
HADOOP-5595. NameNode does not need to run a replicator to choose a
Modified:
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java?rev=776032&r1=776031&r2=776032&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java
(original)
+++
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/AccumulatingReducer.java
Mon May 18 18:15:29 2009
@@ -22,8 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
@@ -46,7 +45,10 @@
*
*/
public class AccumulatingReducer extends MapReduceBase
- implements Reducer<UTF8, UTF8, UTF8, UTF8> {
+ implements Reducer<Text, Text, Text, Text> {
+ static final String VALUE_TYPE_LONG = "l:";
+ static final String VALUE_TYPE_FLOAT = "f:";
+ static final String VALUE_TYPE_STRING = "s:";
private static final Log LOG = LogFactory.getLog(AccumulatingReducer.class);
protected String hostName;
@@ -61,9 +63,9 @@
LOG.info("Starting AccumulatingReducer on " + hostName);
}
- public void reduce(UTF8 key,
- Iterator<UTF8> values,
- OutputCollector<UTF8, UTF8> output,
+ public void reduce(Text key,
+ Iterator<Text> values,
+ OutputCollector<Text, Text> output,
Reporter reporter
) throws IOException {
String field = key.toString();
@@ -71,30 +73,30 @@
reporter.setStatus("starting " + field + " ::host = " + hostName);
// concatenate strings
- if (field.startsWith("s:")) {
+ if (field.startsWith(VALUE_TYPE_STRING)) {
String sSum = "";
while (values.hasNext())
sSum += values.next().toString() + ";";
- output.collect(key, new UTF8(sSum));
+ output.collect(key, new Text(sSum));
reporter.setStatus("finished " + field + " ::host = " + hostName);
return;
}
// sum long values
- if (field.startsWith("f:")) {
+ if (field.startsWith(VALUE_TYPE_FLOAT)) {
float fSum = 0;
while (values.hasNext())
fSum += Float.parseFloat(values.next().toString());
- output.collect(key, new UTF8(String.valueOf(fSum)));
+ output.collect(key, new Text(String.valueOf(fSum)));
reporter.setStatus("finished " + field + " ::host = " + hostName);
return;
}
// sum long values
- if (field.startsWith("l:")) {
+ if (field.startsWith(VALUE_TYPE_LONG)) {
long lSum = 0;
while (values.hasNext()) {
lSum += Long.parseLong(values.next().toString());
}
- output.collect(key, new UTF8(String.valueOf(lSum)));
+ output.collect(key, new Text(String.valueOf(lSum)));
}
reporter.setStatus("finished " + field + " ::host = " + hostName);
}
Modified:
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java?rev=776032&r1=776031&r2=776032&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java
(original)
+++
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DFSCIOTest.java
Mon May 18 18:15:29 2009
@@ -18,18 +18,26 @@
package org.apache.hadoop.fs;
-import java.io.*;
-
-import junit.framework.TestCase;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
import java.util.Date;
import java.util.StringTokenizer;
-import org.apache.commons.logging.*;
+import junit.framework.TestCase;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.io.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
/**
* Distributed i/o benchmark.
@@ -60,6 +68,7 @@
*/
public class DFSCIOTest extends TestCase {
// Constants
+ private static final Log LOG = LogFactory.getLog(DFSCIOTest.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_WRITE = 1;
private static final int TEST_TYPE_CLEANUP = 2;
@@ -67,7 +76,6 @@
private static final String BASE_FILE_NAME = "test_io_";
private static final String DEFAULT_RES_FILE_NAME = "DFSCIOTest_results.log";
- private static final Log LOG = FileInputFormat.LOG;
private static Configuration fsConfig = new Configuration();
private static final long MEGA = 0x100000;
private static String TEST_ROOT_DIR =
System.getProperty("test.build.data","/benchmarks/DFSCIOTest");
@@ -124,9 +132,9 @@
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
- UTF8.class, LongWritable.class,
+ Text.class, LongWritable.class,
CompressionType.NONE);
- writer.append(new UTF8(name), new LongWritable(fileSize));
+ writer.append(new Text(name), new LongWritable(fileSize));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
@@ -159,7 +167,7 @@
super(fsConfig);
}
- void collectStats(OutputCollector<UTF8, UTF8> output,
+ void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
Object objSize) throws IOException {
@@ -169,11 +177,16 @@
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
- output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
- output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
- output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
- output.collect(new UTF8("f:rate"), new
UTF8(String.valueOf(ioRateMbSec*1000)));
- output.collect(new UTF8("f:sqrate"), new
UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
+ new Text(String.valueOf(1)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
+ new Text(String.valueOf(totalSize)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
+ new Text(String.valueOf(execTime)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
+ new Text(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
+ new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
}
@@ -274,8 +287,8 @@
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
- job.setOutputKeyClass(UTF8.class);
- job.setOutputValueClass(UTF8.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
Modified:
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java?rev=776032&r1=776031&r2=776032&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java
(original)
+++
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/DistributedFSCheck.java
Mon May 18 18:15:29 2009
@@ -18,20 +18,28 @@
package org.apache.hadoop.fs;
-import java.io.*;
-
-import junit.framework.TestCase;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
import java.util.Date;
import java.util.StringTokenizer;
import java.util.TreeSet;
import java.util.Vector;
-import org.apache.commons.logging.*;
+import junit.framework.TestCase;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.io.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.*;
/**
* Distributed checkup of the file system consistency.
@@ -45,6 +53,7 @@
*/
public class DistributedFSCheck extends TestCase {
// Constants
+ private static final Log LOG = LogFactory.getLog(DistributedFSCheck.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_CLEANUP = 2;
private static final int DEFAULT_BUFFER_SIZE = 1000000;
@@ -52,7 +61,6 @@
private static final long MEGA = 0x100000;
private static Configuration fsConfig = new Configuration();
- private static final Log LOG = FileInputFormat.LOG;
private static Path TEST_ROOT_DIR = new
Path(System.getProperty("test.build.data","/benchmarks/DistributedFSCheck"));
private static Path MAP_INPUT_DIR = new Path(TEST_ROOT_DIR, "map_input");
private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read");
@@ -92,7 +100,7 @@
Path inputFile = new Path(MAP_INPUT_DIR, "in_file");
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, fsConfig, inputFile,
- UTF8.class, LongWritable.class,
CompressionType.NONE);
+ Text.class, LongWritable.class,
CompressionType.NONE);
try {
nrFiles = 0;
@@ -120,7 +128,7 @@
long blockSize = fs.getDefaultBlockSize();
long fileLength = rootStatus.getLen();
for(long offset = 0; offset < fileLength; offset += blockSize)
- writer.append(new UTF8(rootFile.toString()), new LongWritable(offset));
+ writer.append(new Text(rootFile.toString()), new LongWritable(offset));
return;
}
@@ -171,14 +179,17 @@
return new Long(actualSize);
}
- void collectStats(OutputCollector<UTF8, UTF8> output,
+ void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
Object corruptedBlock) throws IOException {
- output.collect(new UTF8("l:blocks"), new UTF8(String.valueOf(1)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "blocks"),
+ new Text(String.valueOf(1)));
if (corruptedBlock.getClass().getName().endsWith("String")) {
- output.collect(new UTF8("s:badBlocks"), new
UTF8((String)corruptedBlock));
+ output.collect(
+ new Text(AccumulatingReducer.VALUE_TYPE_STRING + "badBlocks"),
+ new Text((String)corruptedBlock));
return;
}
long totalSize = ((Long)corruptedBlock).longValue();
@@ -187,9 +198,12 @@
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
- output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
- output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
- output.collect(new UTF8("f:rate"), new
UTF8(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
+ new Text(String.valueOf(totalSize)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
+ new Text(String.valueOf(execTime)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
+ new Text(String.valueOf(ioRateMbSec*1000)));
}
}
@@ -203,8 +217,8 @@
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
- job.setOutputKeyClass(UTF8.class);
- job.setOutputValueClass(UTF8.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
Modified:
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java?rev=776032&r1=776031&r2=776032&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java
(original)
+++
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/IOMapperBase.java
Mon May 18 18:15:29 2009
@@ -22,9 +22,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
@@ -40,7 +38,7 @@
*
*/
public abstract class IOMapperBase extends Configured
- implements Mapper<UTF8, LongWritable, UTF8, UTF8> {
+ implements Mapper<Text, LongWritable, Text, Text> {
protected byte[] buffer;
protected int bufferSize;
@@ -93,7 +91,7 @@
* @param doIOReturnValue value returned by {...@link
#doIO(Reporter,String,long)}
* @throws IOException
*/
- abstract void collectStats(OutputCollector<UTF8, UTF8> output,
+ abstract void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
Object doIOReturnValue) throws IOException;
@@ -111,9 +109,9 @@
* {...@link #collectStats(OutputCollector,String,long,Object)}
* is called to prepare stat data for a subsequent reducer.
*/
- public void map(UTF8 key,
+ public void map(Text key,
LongWritable value,
- OutputCollector<UTF8, UTF8> output,
+ OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String name = key.toString();
long longValue = value.get();
Modified:
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java?rev=776032&r1=776031&r2=776032&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java
(original)
+++ hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestDFSIO.java
Mon May 18 18:15:29 2009
@@ -18,19 +18,28 @@
package org.apache.hadoop.fs;
-import java.io.*;
-
-import junit.framework.TestCase;
+import java.io.BufferedReader;
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.PrintStream;
import java.util.Date;
import java.util.StringTokenizer;
-import org.apache.commons.logging.*;
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.conf.*;
/**
* Distributed i/o benchmark.
@@ -61,6 +70,7 @@
*/
public class TestDFSIO extends TestCase {
// Constants
+ private static final Log LOG = LogFactory.getLog(TestDFSIO.class);
private static final int TEST_TYPE_READ = 0;
private static final int TEST_TYPE_WRITE = 1;
private static final int TEST_TYPE_CLEANUP = 2;
@@ -68,7 +78,6 @@
private static final String BASE_FILE_NAME = "test_io_";
private static final String DEFAULT_RES_FILE_NAME = "TestDFSIO_results.log";
- private static final Log LOG = FileInputFormat.LOG;
private static Configuration fsConfig = new Configuration();
private static final long MEGA = 0x100000;
private static String TEST_ROOT_DIR =
System.getProperty("test.build.data","/benchmarks/TestDFSIO");
@@ -119,9 +128,9 @@
SequenceFile.Writer writer = null;
try {
writer = SequenceFile.createWriter(fs, fsConfig, controlFile,
- UTF8.class, LongWritable.class,
+ Text.class, LongWritable.class,
CompressionType.NONE);
- writer.append(new UTF8(name), new LongWritable(fileSize));
+ writer.append(new Text(name), new LongWritable(fileSize));
} catch(Exception e) {
throw new IOException(e.getLocalizedMessage());
} finally {
@@ -154,7 +163,7 @@
super(fsConfig);
}
- void collectStats(OutputCollector<UTF8, UTF8> output,
+ void collectStats(OutputCollector<Text, Text> output,
String name,
long execTime,
Object objSize) throws IOException {
@@ -164,11 +173,16 @@
LOG.info("Exec time = " + execTime);
LOG.info("IO rate = " + ioRateMbSec);
- output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1)));
- output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize)));
- output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime)));
- output.collect(new UTF8("f:rate"), new
UTF8(String.valueOf(ioRateMbSec*1000)));
- output.collect(new UTF8("f:sqrate"), new
UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "tasks"),
+ new Text(String.valueOf(1)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "size"),
+ new Text(String.valueOf(totalSize)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_LONG + "time"),
+ new Text(String.valueOf(execTime)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "rate"),
+ new Text(String.valueOf(ioRateMbSec*1000)));
+ output.collect(new Text(AccumulatingReducer.VALUE_TYPE_FLOAT + "sqrate"),
+ new Text(String.valueOf(ioRateMbSec*ioRateMbSec*1000)));
}
}
@@ -230,8 +244,8 @@
job.setReducerClass(AccumulatingReducer.class);
FileOutputFormat.setOutputPath(job, outputDir);
- job.setOutputKeyClass(UTF8.class);
- job.setOutputValueClass(UTF8.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
Modified:
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestFileSystem.java?rev=776032&r1=776031&r2=776032&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestFileSystem.java
(original)
+++
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestFileSystem.java
Mon May 18 18:15:29 2009
@@ -42,17 +42,9 @@
import org.apache.hadoop.fs.shell.CommandFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.UTF8;
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.security.UnixUserGroupInformation;
@@ -122,13 +114,13 @@
SequenceFile.Writer writer =
SequenceFile.createWriter(fs, conf, controlFile,
- UTF8.class, LongWritable.class,
CompressionType.NONE);
+ Text.class, LongWritable.class,
CompressionType.NONE);
long totalSize = 0;
long maxSize = ((megaBytes / numFiles) * 2) + 1;
try {
while (totalSize < megaBytes) {
- UTF8 name = new UTF8(Long.toString(random.nextLong()));
+ Text name = new Text(Long.toString(random.nextLong()));
long size = random.nextLong();
if (size < 0)
@@ -148,7 +140,7 @@
}
public static class WriteMapper extends Configured
- implements Mapper<UTF8, LongWritable, UTF8, LongWritable> {
+ implements Mapper<Text, LongWritable, Text, LongWritable> {
private Random random = new Random();
private byte[] buffer = new byte[BUFFER_SIZE];
@@ -175,8 +167,8 @@
fastCheck = job.getBoolean("fs.test.fastCheck", false);
}
- public void map(UTF8 key, LongWritable value,
- OutputCollector<UTF8, LongWritable> collector,
+ public void map(Text key, LongWritable value,
+ OutputCollector<Text, LongWritable> collector,
Reporter reporter)
throws IOException {
@@ -211,7 +203,7 @@
// rename to final location
fs.rename(tempFile, new Path(DATA_DIR, name));
- collector.collect(new UTF8("bytes"), new LongWritable(written));
+ collector.collect(new Text("bytes"), new LongWritable(written));
reporter.setStatus("wrote " + name);
}
@@ -237,14 +229,14 @@
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, WRITE_DIR);
- job.setOutputKeyClass(UTF8.class);
+ job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
}
public static class ReadMapper extends Configured
- implements Mapper<UTF8, LongWritable, UTF8, LongWritable> {
+ implements Mapper<Text, LongWritable, Text, LongWritable> {
private Random random = new Random();
private byte[] buffer = new byte[BUFFER_SIZE];
@@ -269,8 +261,8 @@
fastCheck = job.getBoolean("fs.test.fastCheck", false);
}
- public void map(UTF8 key, LongWritable value,
- OutputCollector<UTF8, LongWritable> collector,
+ public void map(Text key, LongWritable value,
+ OutputCollector<Text, LongWritable> collector,
Reporter reporter)
throws IOException {
@@ -309,7 +301,7 @@
in.close();
}
- collector.collect(new UTF8("bytes"), new LongWritable(read));
+ collector.collect(new Text("bytes"), new LongWritable(read));
reporter.setStatus("read " + name);
}
@@ -335,7 +327,7 @@
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
- job.setOutputKeyClass(UTF8.class);
+ job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
@@ -343,7 +335,7 @@
public static class SeekMapper<K> extends Configured
- implements Mapper<WritableComparable, LongWritable, K, LongWritable> {
+ implements Mapper<Text, LongWritable, K, LongWritable> {
private Random random = new Random();
private byte[] check = new byte[BUFFER_SIZE];
@@ -367,7 +359,7 @@
fastCheck = job.getBoolean("fs.test.fastCheck", false);
}
- public void map(WritableComparable key, LongWritable value,
+ public void map(Text key, LongWritable value,
OutputCollector<K, LongWritable> collector,
Reporter reporter)
throws IOException {
@@ -431,7 +423,7 @@
job.setReducerClass(LongSumReducer.class);
FileOutputFormat.setOutputPath(job, READ_DIR);
- job.setOutputKeyClass(UTF8.class);
+ job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setNumReduceTasks(1);
JobClient.runJob(job);
Modified:
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestHarFileSystem.java
URL:
http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestHarFileSystem.java?rev=776032&r1=776031&r2=776032&view=diff
==============================================================================
---
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestHarFileSystem.java
(original)
+++
hadoop/core/trunk/src/test/hdfs-with-mr/org/apache/hadoop/fs/TestHarFileSystem.java
Mon May 18 18:15:29 2009
@@ -18,34 +18,19 @@
package org.apache.hadoop.fs;
-
import java.io.IOException;
import java.util.Iterator;
+import junit.framework.TestCase;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FsShell;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.MiniMRCluster;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.*;
import org.apache.hadoop.tools.HadoopArchives;
import org.apache.hadoop.util.ToolRunner;
-import junit.framework.TestCase;
-
/**
* test the har file system
* create a har filesystem
@@ -183,14 +168,17 @@
FSDataInputStream fin = harFs.open(harFilea);
byte[] b = new byte[4];
int readBytes = fin.read(b);
+ assertTrue("Empty read.", readBytes > 0);
fin.close();
assertTrue("strings are equal ", (b[0] == "a".getBytes()[0]));
fin = harFs.open(harFileb);
- fin.read(b);
+ readBytes = fin.read(b);
+ assertTrue("Empty read.", readBytes > 0);
fin.close();
assertTrue("strings are equal ", (b[0] == "b".getBytes()[0]));
fin = harFs.open(harFilec);
- fin.read(b);
+ readBytes = fin.read(b);
+ assertTrue("Empty read.", readBytes > 0);
fin.close();
assertTrue("strings are equal ", (b[0] == "c".getBytes()[0]));
// ok all files match
@@ -214,7 +202,8 @@
Path reduceFile = status[0].getPath();
FSDataInputStream reduceIn = fs.open(reduceFile);
b = new byte[6];
- reduceIn.read(b);
+ readBytes = reduceIn.read(b);
+ assertTrue("Should read 6 bytes.", readBytes == 6);
//assuming all the 6 bytes were read.
Text readTxt = new Text(b);
assertTrue("a\nb\nc\n".equals(readTxt.toString()));