Author: cutting
Date: Mon Jul 2 22:12:43 2012
New Revision: 1356503
URL: http://svn.apache.org/viewvc?rev=1356503&view=rev
Log:
AVRO-1120. Let AvroMultipleOutput jobs use multiple schemas with map-only jobs.
Contributed by Ashish Nagavaram.
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1356503&r1=1356502&r2=1356503&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Mon Jul 2 22:12:43 2012
@@ -21,6 +21,9 @@ Avro 1.7.1 (unreleased)
IMPROVEMENTS
+ AVRO-1120. Let AvroMultipleOutput jobs use multiple schemas with
+ map-only jobs. (Ashish Nagavaram via cutting)
+
BUG FIXES
AVRO-1114. Java: Update license headers for new mapreduce code. (cutting)
Modified:
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java?rev=1356503&r1=1356502&r2=1356503&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
(original)
+++
avro/trunk/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMultipleOutputs.java
Mon Jul 2 22:12:43 2012
@@ -70,7 +70,7 @@ import org.apache.hadoop.io.NullWritable
* Usage pattern for job submission:
* <pre>
*
- * Job job = new Job();
+ * JobConf job = new JobConf();
*
* FileInputFormat.setInputPath(job, inDir);
* FileOutputFormat.setOutputPath(job, outDir);
@@ -526,15 +526,19 @@ public class AvroMultipleOutputs {
public static final String CONFIG_NAMED_OUTPUT = "mo.config.namedOutput";
@SuppressWarnings({"unchecked"})
- public RecordWriter<Object, Object> getRecordWriter(FileSystem fs,JobConf
job, String baseFileName, Progressable arg3) throws IOException
- {
+ public RecordWriter<Object, Object> getRecordWriter(FileSystem fs,JobConf
job, String baseFileName, Progressable arg3) throws IOException {
String nameOutput = job.get(CONFIG_NAMED_OUTPUT, null);
String fileName = getUniqueName(job, baseFileName);
Schema schema = schemaList.get(nameOutput+"_SCHEMA");
JobConf outputConf = new JobConf(job);
outputConf.setOutputFormat(getNamedOutputFormatClass(job, nameOutput));
- if(schema!=null)
- AvroJob.setOutputSchema(outputConf,schema);
+ boolean isMapOnly = job.getNumReduceTasks() == 0;
+ if (schema != null) {
+ if (isMapOnly)
+ AvroJob.setMapOutputSchema(outputConf, schema);
+ else
+ AvroJob.setOutputSchema(outputConf, schema);
+ }
OutputFormat outputFormat = outputConf.getOutputFormat();
return outputFormat.getRecordWriter(fs, outputConf, fileName, arg3);
}
Modified:
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java?rev=1356503&r1=1356502&r2=1356503&view=diff
==============================================================================
---
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
(original)
+++
avro/trunk/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestAvroMultipleOutputs.java
Mon Jul 2 22:12:43 2012
@@ -45,19 +45,31 @@ import org.junit.Test;
public class TestAvroMultipleOutputs {
- private static final String UTF8 = "UTF-8";
+ private static final String UTF8 = "UTF-8";
public static class MapImpl extends AvroMapper<Utf8, Pair<Utf8, Long>> {
-
-
+ private AvroMultipleOutputs amos;
+
+ public void configure(JobConf Job) {
+ this.amos = new AvroMultipleOutputs(Job);
+ }
@Override
public void map(Utf8 text, AvroCollector<Pair<Utf8,Long>> collector,
Reporter reporter) throws IOException {
StringTokenizer tokens = new StringTokenizer(text.toString());
- while (tokens.hasMoreTokens())
- collector.collect(new Pair<Utf8,Long>(new
Utf8(tokens.nextToken()),1L));
+ while (tokens.hasMoreTokens()) {
+ String tok = tokens.nextToken();
+ collector.collect(new Pair<Utf8,Long>(new Utf8(tok),1L));
+ amos.getCollector("myavro2",reporter)
+ .collect(new Pair<Utf8,Long>(new Utf8(tok),1L).toString());
+ }
+
+ }
+ public void close() throws IOException {
+ amos.close();
}
+
}
public static class ReduceImpl
@@ -91,6 +103,8 @@ public class TestAvroMultipleOutputs {
testJob();
testProjection();
testProjection1();
+ testJob_noreducer();
+ testProjection_noreducer();
}
@SuppressWarnings("deprecation")
@@ -118,7 +132,7 @@ public class TestAvroMultipleOutputs {
FileOutputFormat.setCompressOutput(job, false);
AvroMultipleOutputs.addNamedOutput(job,"myavro",AvroOutputFormat.class,
new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema());
AvroMultipleOutputs.addNamedOutput(job,"myavro1",AvroOutputFormat.class,
Schema.create(Schema.Type.STRING));
-
+ AvroMultipleOutputs.addNamedOutput(job,"myavro2",AvroOutputFormat.class,
Schema.create(Schema.Type.STRING));
WordCountUtil.setMeta(job);
@@ -201,4 +215,49 @@ public class TestAvroMultipleOutputs {
}
Assert.assertEquals(sumOfCounts, actualSumOfCounts);
}
+
+ @SuppressWarnings("deprecation")
+ public void testJob_noreducer() throws Exception {
+ JobConf job = new JobConf();
+ job.setNumReduceTasks(0);
+// private static final String UTF8 = "UTF-8";
+ String dir = System.getProperty("test.dir", ".") + "/mapred";
+ Path outputPath = new Path(dir + "/out");
+
+ outputPath.getFileSystem(job).delete(outputPath);
+ WordCountUtil.writeLinesFile();
+
+ job.setJobName("AvroMultipleOutputs_noreducer");
+
+ AvroJob.setInputSchema(job, Schema.create(Schema.Type.STRING));
+ AvroJob.setOutputSchema(job,
+ new Pair<Utf8,Long>(new Utf8(""), 0L).getSchema());
+
+ AvroJob.setMapperClass(job, MapImpl.class);
+
+ FileInputFormat.setInputPaths(job, new Path(dir + "/in"));
+ FileOutputFormat.setOutputPath(job, outputPath);
+ FileOutputFormat.setCompressOutput(job, false);
+ AvroMultipleOutputs.addNamedOutput(job,"myavro2",AvroOutputFormat.class,
Schema.create(Schema.Type.STRING));
+ JobClient.runJob(job);
+ }
+
+ public void testProjection_noreducer() throws Exception {
+ JobConf job = new JobConf();
+ long onel = 1;
+ Schema readerSchema = Schema.create(Schema.Type.STRING);
+ AvroJob.setInputSchema(job, readerSchema);
+ String dir= System.getProperty("test.dir", ".") + "/mapred";
+ Path inputPath = new Path(dir + "/out" + "/myavro2-m-00000.avro");
+ FileStatus fileStatus = FileSystem.get(job).getFileStatus(inputPath);
+ FileSplit fileSplit = new FileSplit(inputPath, 0, fileStatus.getLen(),
job);
+ AvroRecordReader<Utf8> recordReader_new = new AvroRecordReader<Utf8>(job,
fileSplit);
+ AvroWrapper<Utf8> inputPair_new = new AvroWrapper<Utf8>(null);
+ NullWritable ignore = NullWritable.get();
+ long testl=0;
+ while(recordReader_new.next(inputPair_new, ignore)) {
+
testl=Long.parseLong(inputPair_new.datum().toString().split(":")[2].replace("}","").trim());
+ Assert.assertEquals(onel,testl);
+ }
+ }
}