Author: cutting
Date: Wed Apr 20 20:00:53 2011
New Revision: 1095495
URL: http://svn.apache.org/viewvc?rev=1095495&view=rev
Log:
Merge changes 1094812, 1095206, 1095207, 1095208 and 1095493 from trunk to 1.5
branch. Fixes: AVRO-802, AVRO-799, AVRO-798, and AVRO-763.
Added:
avro/branches/branch-1.5/lang/java/tools/src/test/java/org/apache/avro/tool/TestTextFileTools.java
- copied unchanged from r1095207,
avro/trunk/lang/java/tools/src/test/java/org/apache/avro/tool/TestTextFileTools.java
Modified:
avro/branches/branch-1.5/ (props changed)
avro/branches/branch-1.5/CHANGES.txt
avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml
avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/package.html
avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java
avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java
Propchange: avro/branches/branch-1.5/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 20:00:53 2011
@@ -1 +1 @@
-/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550
+/avro/trunk:1075938,1075993,1078917,1079055,1079060,1079063,1083246,1085921,1086727,1086730,1086866,1087076,1087129,1087136,1087439-1087440,1087463,1087472,1087792,1089128,1089131,1089550,1094812,1095206-1095208,1095493
Modified: avro/branches/branch-1.5/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/CHANGES.txt?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/CHANGES.txt (original)
+++ avro/branches/branch-1.5/CHANGES.txt Wed Apr 20 20:00:53 2011
@@ -30,7 +30,20 @@ Avro 1.5.1 (unreleased)
AVRO-794. Makefile.am is no longer required in C++. (thiru)
- AVRO-795. C++ Datafile reader makes it hard to build adaptive clients.
(thiru)
+ AVRO-795. C++ Datafile reader makes it hard to build adaptive
+ clients. (thiru)
+
+ AVRO-802. Java: Add documentation for non-Avro input, map-only
+ jobs. (cutting)
+
+ AVRO-799. Java: Add support for --codec parameter to the
+ 'fromtext' command. Also made some performance improvements, bug
+ fixes and added tests for this command. (cutting)
+
+ AVRO-798. Add checksum to Snappy compressed blocks. (cutting)
+
+ AVRO-763. Java MapReduce API: add support for configure() and
+ close() methods to mappers and reducers. (Marshall Pierce via cutting)
BUG FIXES
@@ -39,6 +52,9 @@ Avro 1.5.1 (unreleased)
AVRO-780. Java: Fix a NullPointerException with reflect data when
a union contains an array and null. (cutting)
+ AVRO-790. Java: GenericDatumReader can fail when reusing objects with
unions
+ containing 'bytes' fields. (scottcarey)
+
Avro 1.5.0 (10 March 2011)
INCOMPATIBLE CHANGES
Modified: avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
--- avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml (original)
+++ avro/branches/branch-1.5/doc/src/content/xdocs/spec.xml Wed Apr 20 20:00:53
2011
@@ -701,7 +701,8 @@
<title>snappy</title>
<p>The "snappy" codec uses
Google's <a href="http://code.google.com/p/snappy/">Snappy</a>
- compression library.</p>
+ compression library. Each compressed block is followed
+ by its 4-byte, big-endian CRC32 checksum.</p>
</section>
</section>
</section>
Modified:
avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
---
avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
(original)
+++
avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/file/SnappyCodec.java
Wed Apr 20 20:00:53 2011
@@ -19,19 +19,19 @@ package org.apache.avro.file;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.zip.CRC32;
import org.xerial.snappy.Snappy;
import org.xerial.snappy.SnappyException;
/** * Implements Snappy compression and decompression. */
class SnappyCodec extends Codec {
-
- private static final SnappyCodec INSTANCE = new SnappyCodec();
+ private CRC32 crc32 = new CRC32();
static class Option extends CodecFactory {
@Override
protected Codec createInstance() {
- return INSTANCE;
+ return new SnappyCodec();
}
}
@@ -43,10 +43,15 @@ class SnappyCodec extends Codec {
ByteBuffer compress(ByteBuffer in) throws IOException {
try {
ByteBuffer out =
- ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining()));
+ ByteBuffer.allocate(Snappy.maxCompressedLength(in.remaining())+4);
int size = Snappy.compress(in.array(), in.position(), in.remaining(),
out.array(), 0);
- out.limit(size);
+ crc32.reset();
+ crc32.update(in.array(), in.position(), in.remaining());
+ out.putInt(size, (int)crc32.getValue());
+
+ out.limit(size+4);
+
return out;
} catch (SnappyException e) {
throw new IOException(e);
@@ -57,10 +62,16 @@ class SnappyCodec extends Codec {
ByteBuffer decompress(ByteBuffer in) throws IOException {
try {
ByteBuffer out = ByteBuffer.allocate
- (Snappy.uncompressedLength(in.array(), in.position(), in.remaining()));
- int size = Snappy.uncompress(in.array(), in.position(), in.remaining(),
+ (Snappy.uncompressedLength(in.array(),in.position(),in.remaining()-4));
+ int size = Snappy.uncompress(in.array(),in.position(),in.remaining()-4,
out.array(), 0);
out.limit(size);
+
+ crc32.reset();
+ crc32.update(out.array(), 0, size);
+ if (in.getInt(in.limit()-4) != (int)crc32.getValue())
+ throw new IOException("Checksum failure");
+
return out;
} catch (SnappyException e) {
throw new IOException(e);
Modified:
avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
---
avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
(original)
+++
avro/branches/branch-1.5/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
Wed Apr 20 20:00:53 2011
@@ -330,7 +330,7 @@ public class GenericDatumReader<D> imple
* byte array representation. By default, this calls {@link
* Decoder#readBytes(ByteBuffer)}.*/
protected Object readBytes(Object old, Decoder in) throws IOException {
- return in.readBytes((ByteBuffer)old);
+ return in.readBytes(old instanceof ByteBuffer ? (ByteBuffer) old : null);
}
/** Called to read integers. Subclasses may override to use a different
Modified:
avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
---
avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java
(original)
+++
avro/branches/branch-1.5/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java
Wed Apr 20 20:00:53 2011
@@ -45,7 +45,6 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.io.JsonDecoder;
import org.apache.avro.compiler.specific.TestSpecificCompiler;
import org.apache.avro.util.Utf8;
@@ -79,7 +78,7 @@ public class TestSchema {
+ " \"name\": \"inner_union\" }\n" + " ]\n" + "}\n";
private static final int COUNT =
- Integer.parseInt(System.getProperty("test.count", "10"));
+ Integer.parseInt(System.getProperty("test.count", "30"));
@Test
public void testNull() throws Exception {
@@ -163,6 +162,18 @@ public class TestSchema {
check("{\"type\":\"map\", \"values\":\"long\"}", "{\"a\":1}", map);
checkParseError("{\"type\":\"map\"}"); // values required
}
+
+ @Test
+ public void testUnionMap() throws Exception {
+ String unionMapSchema = "{\"name\":\"foo\", \"type\":\"record\"," +
+ " \"fields\":[ {\"name\":\"mymap\", \"type\":" +
+ " [{\"type\":\"map\", \"values\":" +
+ " [\"int\",\"long\",\"float\",\"string\"]}," +
+ " \"null\"]" +
+ " }]" +
+ " }";
+ check(unionMapSchema, true);
+ }
@Test
public void testRecord() throws Exception {
@@ -558,6 +569,7 @@ public class TestSchema {
throws Exception {
Schema schema = Schema.parse(jsonSchema);
checkProp(schema);
+ Object reuse = null;
for (Object datum : new RandomData(schema, COUNT)) {
if (induce) {
@@ -570,7 +582,10 @@ public class TestSchema {
checkBinary(schema, datum,
new GenericDatumWriter<Object>(),
- new GenericDatumReader<Object>());
+ new GenericDatumReader<Object>(), null);
+ reuse = checkBinary(schema, datum,
+ new GenericDatumWriter<Object>(),
+ new GenericDatumReader<Object>(), reuse);
checkDirectBinary(schema, datum,
new GenericDatumWriter<Object>(),
new GenericDatumReader<Object>());
@@ -603,6 +618,14 @@ public class TestSchema {
DatumWriter<Object> writer,
DatumReader<Object> reader)
throws IOException {
+ checkBinary(schema, datum, writer, reader, null);
+ }
+
+ public static Object checkBinary(Schema schema, Object datum,
+ DatumWriter<Object> writer,
+ DatumReader<Object> reader,
+ Object reuse)
+ throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
writer.setSchema(schema);
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
@@ -613,10 +636,11 @@ public class TestSchema {
reader.setSchema(schema);
Object decoded =
- reader.read(null, DecoderFactory.get().binaryDecoder(
+ reader.read(reuse, DecoderFactory.get().binaryDecoder(
data, null));
assertEquals("Decoded data does not match.", datum, decoded);
+ return decoded;
}
public static void checkDirectBinary(Schema schema, Object datum,
Modified:
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
---
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
(original)
+++
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroMapper.java
Wed Apr 20 20:00:53 2011
@@ -18,17 +18,20 @@
package org.apache.avro.mapred;
+import java.io.Closeable;
import java.io.IOException;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.Reporter;
/** A mapper for Avro data.
*
* <p>Applications subclass this class and pass their subclass to {@link
- * AvroJob#setMapperClass}, overriding {@link #map}.
+ * AvroJob#setMapperClass(JobConf, Class)}, overriding {@link #map(Object,
AvroCollector, Reporter)}.
*/
-public class AvroMapper<IN,OUT> extends Configured {
+public class AvroMapper<IN, OUT> extends Configured implements
JobConfigurable, Closeable {
/** Called with each map input datum. By default, collects inputs. */
@SuppressWarnings("unchecked")
@@ -38,4 +41,15 @@ public class AvroMapper<IN,OUT> extends
}
+ /** Subclasses can override this as desired. */
+ @Override
+ public void close() throws IOException {
+ // no op
+ }
+
+ /** Subclasses can override this as desired. */
+ @Override
+ public void configure(JobConf jobConf) {
+ // no op
+ }
}
Modified:
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
---
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
(original)
+++
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/AvroReducer.java
Wed Apr 20 20:00:53 2011
@@ -18,19 +18,22 @@
package org.apache.avro.mapred;
+import java.io.Closeable;
import java.io.IOException;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.Reporter;
/** A reducer for Avro data.
*
* <p>Applications should subclass this class and pass their subclass to {@link
- * AvroJob#setReducerClass} and perhaps {@link AvroJob#setCombinerClass}.
- * Subclasses override {@link #reduce}.
+ * AvroJob#setReducerClass(JobConf, Class)} and perhaps {@link
AvroJob#setCombinerClass(JobConf, Class)}.
+ * Subclasses override {@link #reduce(Object, Iterable, AvroCollector,
Reporter)}.
*/
-public class AvroReducer<K,V,OUT> extends Configured {
+public class AvroReducer<K,V,OUT> extends Configured implements
JobConfigurable, Closeable {
private Pair<K,V> outputPair;
@@ -48,4 +51,15 @@ public class AvroReducer<K,V,OUT> extend
}
}
+ /** Subclasses can override this as desired. */
+ @Override
+ public void close() throws IOException {
+ // no op
+ }
+
+ /** Subclasses can override this as desired. */
+ @Override
+ public void configure(JobConf jobConf) {
+ // no op
+ }
}
Modified:
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
---
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
(original)
+++
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopMapper.java
Wed Apr 20 20:00:53 2011
@@ -21,11 +21,11 @@ package org.apache.avro.mapred;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.Mapper;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
/** Bridge between a {@link org.apache.hadoop.mapred.Mapper} and an {@link
@@ -45,6 +45,7 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> ext
(conf.getClass(AvroJob.MAPPER, AvroMapper.class, AvroMapper.class),
conf);
this.isMapOnly = conf.getNumReduceTasks() == 0;
+ this.mapper.configure(conf);
}
@SuppressWarnings("unchecked")
@@ -80,4 +81,9 @@ class HadoopMapper<IN,OUT,K,V,KO,VO> ext
mapper.map(wrapper.datum(), out, reporter);
}
+ @Override
+ public void close() throws IOException {
+ this.mapper.close();
+ }
+
}
Modified:
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
---
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
(original)
+++
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/HadoopReducerBase.java
Wed Apr 20 20:00:53 2011
@@ -39,6 +39,7 @@ abstract class HadoopReducerBase<K,V,OUT
@Override
public void configure(JobConf conf) {
this.reducer = getReducer(conf);
+ this.reducer.configure(conf);
}
class ReduceIterable implements Iterable<V>, Iterator<V> {
@@ -60,4 +61,8 @@ abstract class HadoopReducerBase<K,V,OUT
reducer.reduce(key.datum(), reduceIterable, collector, reporter);
}
+ @Override
+ public void close() throws IOException {
+ this.reducer.close();
+ }
}
Modified:
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/package.html
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/package.html?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
---
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/package.html
(original)
+++
avro/branches/branch-1.5/lang/java/mapred/src/main/java/org/apache/avro/mapred/package.html
Wed Apr 20 20:00:53 2011
@@ -96,5 +96,23 @@ Avro data, with map and reduce functions
</ul>
</p>
+<p>For jobs whose input is non-Avro data file and which use a
+ non-Avro {@link org.apache.hadoop.mapred.Mapper} and no reducer,
+ i.e., a <i>map-only</i> job:
+ <ul>
+ <li>Set your input file format with {@link
+ org.apache.hadoop.mapred.JobConf#setInputFormat}.</li>
+ <li>Implement {@link org.apache.hadoop.mapred.Mapper} and specify
+ your job's mapper with {@link
+ org.apache.hadoop.mapred.JobConf#setMapperClass}. The output key
+ and value type should be {@link org.apache.avro.mapred.AvroWrapper} and
+ {@link org.apache.hadoop.io.NullWritable}.</li>
+ <li>Call {@link
+ org.apache.hadoop.mapred.JobConf#setNumReduceTasks(int)} with zero.
+ <li>Call {@link org.apache.avro.mapred.AvroJob#setOutputSchema} with your
+ job's output schema.</li>
+ </ul>
+</p>
+
</body>
</html>
Modified:
avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
---
avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
(original)
+++
avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestSequenceFileReader.java
Wed Apr 20 20:00:53 2011
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -55,7 +56,7 @@ public class TestSequenceFileReader {
private static final int COUNT =
Integer.parseInt(System.getProperty("test.count", "10"));
private static final File DIR
- = new File(System.getProperty("test.dir", "/tmp"));
+ = new File(System.getProperty("test.dir", "."));
private static final File FILE = new File(DIR, "test.seq");
private static final Schema SCHEMA
@@ -162,6 +163,45 @@ public class TestSequenceFileReader {
new SpecificDatumReader<Pair<Long,CharSequence>>()));
}
+ private static class NonAvroOnlyMapper
+ extends MapReduceBase
+ implements
Mapper<LongWritable,Text,AvroWrapper<Pair<Long,Utf8>>,NullWritable> {
+
+ public void map(LongWritable key, Text value,
+ OutputCollector<AvroWrapper<Pair<Long,Utf8>>,NullWritable>
out,
+ Reporter reporter) throws IOException {
+ out.collect(new AvroWrapper<Pair<Long,Utf8>>(new
Pair<Long,Utf8>(key.get(), new Utf8(value.toString()))),
+ NullWritable.get());
+ }
+ }
+
+ @Test
+ public void testNonAvroMapOnly() throws Exception {
+ JobConf job = new JobConf();
+ Path output = new Path(System.getProperty("test.dir",".")+"/seq-out");
+
+ output.getFileSystem(job).delete(output);
+
+
+ // configure input for non-Avro sequence file
+ job.setInputFormat(SequenceFileInputFormat.class);
+ FileInputFormat.setInputPaths(job, FILE.toURI().toString());
+
+ // use a hadoop mapper that emits Avro output
+ job.setMapperClass(NonAvroOnlyMapper.class);
+
+ // configure output for avro
+ job.setNumReduceTasks(0); // map-only
+ FileOutputFormat.setOutputPath(job, output);
+ AvroJob.setOutputSchema(job, SCHEMA);
+
+ JobClient.runJob(job);
+
+ checkFile(new DataFileReader<Pair<Long,CharSequence>>
+ (new File(output.toString()+"/part-00000.avro"),
+ new SpecificDatumReader<Pair<Long,CharSequence>>()));
+ }
+
private static class NonAvroReducer
extends MapReduceBase
implements Reducer<AvroKey<Long>,AvroValue<Utf8>,LongWritable,Text> {
Modified:
avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
---
avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
(original)
+++
avro/branches/branch-1.5/lang/java/mapred/src/test/java/org/apache/avro/mapred/TestWeather.java
Wed Apr 20 20:00:53 2011
@@ -20,6 +20,7 @@ package org.apache.avro.mapred;
import java.io.IOException;
import java.io.File;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
@@ -34,15 +35,31 @@ import org.apache.avro.io.DatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.file.DataFileReader;
import static org.apache.avro.file.DataFileConstants.SNAPPY_CODEC;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import org.junit.After;
import org.junit.Test;
-import static org.junit.Assert.*;
import test.Weather;
/** Tests mapred API with a specific record. */
public class TestWeather {
+ private static final AtomicInteger mapCloseCalls = new AtomicInteger();
+ private static final AtomicInteger mapConfigureCalls = new AtomicInteger();
+ private static final AtomicInteger reducerCloseCalls = new AtomicInteger();
+ private static final AtomicInteger reducerConfigureCalls = new
AtomicInteger();
+
+
+ @After
+ public void tearDown() {
+ mapCloseCalls.set(0);
+ mapConfigureCalls.set(0);
+ reducerCloseCalls.set(0);
+ reducerConfigureCalls.set(0);
+ }
+
/** Uses default mapper with no reduces for a map-only identity job. */
@Test
@SuppressWarnings("deprecation")
@@ -64,7 +81,7 @@ public class TestWeather {
FileOutputFormat.setCompressOutput(job, true);
job.setNumReduceTasks(0); // map-only
-
+
JobClient.runJob(job);
// check output is correct
@@ -88,8 +105,18 @@ public class TestWeather {
Reporter reporter) throws IOException {
collector.collect(new Pair<Weather,Void>(w, (Void)null));
}
+
+ @Override
+ public void close() throws IOException {
+ mapCloseCalls.incrementAndGet();
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ mapConfigureCalls.incrementAndGet();
+ }
}
-
+
// output keys only, since values are empty
public static class SortReducer
extends AvroReducer<Weather, Void, Weather> {
@@ -99,7 +126,17 @@ public class TestWeather {
Reporter reporter) throws IOException {
collector.collect(w);
}
- }
+
+ @Override
+ public void close() throws IOException {
+ reducerCloseCalls.incrementAndGet();
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ reducerConfigureCalls.incrementAndGet();
+ }
+ }
@Test
@SuppressWarnings("deprecation")
@@ -140,6 +177,15 @@ public class TestWeather {
check.close();
sorted.close();
+
+ // check that AvroMapper and AvroReducer get close() and configure() called
+ assertEquals(1, mapCloseCalls.get());
+ assertEquals(1, reducerCloseCalls.get());
+ // gets called twice for some reason, so loosen this check
+ assertTrue(mapConfigureCalls.get() >= 1);
+ assertTrue(reducerConfigureCalls.get() >= 1);
+
+
}
Modified:
avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
---
avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java
(original)
+++
avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/FromTextTool.java
Wed Apr 20 20:00:53 2011
@@ -19,9 +19,7 @@ package org.apache.avro.tool;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
-import java.io.BufferedReader;
import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.List;
@@ -34,6 +32,7 @@ import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
/** Reads a text file into an Avro data file.
*
@@ -61,36 +60,68 @@ public class FromTextTool implements Too
OptionSpec<Integer> level = p.accepts("level", "compression level")
.withOptionalArg().ofType(Integer.class);
+ OptionSpec<String> codec = p.accepts("codec", "compression codec")
+ .withOptionalArg().ofType(String.class);
+
OptionSet opts = p.parse(args.toArray(new String[0]));
- if (opts.nonOptionArguments().size() != 2) {
+ List<String> nargs = opts.nonOptionArguments();
+ if (nargs.size() != 2) {
err.println("Expected 2 args: from_file to_file (local filenames," +
" Hadoop URI's, or '-' for stdin/stdout");
p.printHelpOn(err);
return 1;
}
- BufferedInputStream inStream = Util.fileOrStdin(args.get(0), stdin);
- BufferedOutputStream outStream = Util.fileOrStdout(args.get(1), out);
-
int compressionLevel = 1; // Default compression level
if (opts.hasArgument(level)) {
compressionLevel = level.value(opts);
}
- BufferedReader reader = new BufferedReader(new
InputStreamReader(inStream));
+ String codecName = opts.hasArgument(codec)
+ ? codec.value(opts)
+ : DEFLATE_CODEC;
+ CodecFactory codecFactory = codecName.equals(DEFLATE_CODEC)
+ ? CodecFactory.deflateCodec(compressionLevel)
+ : CodecFactory.fromString(codecName);
+
+ BufferedInputStream inStream = Util.fileOrStdin(nargs.get(0), stdin);
+ BufferedOutputStream outStream = Util.fileOrStdout(nargs.get(1), out);
+
DataFileWriter<ByteBuffer> writer =
new DataFileWriter<ByteBuffer>(new GenericDatumWriter<ByteBuffer>());
- writer.setCodec(CodecFactory.deflateCodec(compressionLevel));
+ writer.setCodec(codecFactory);
writer.create(Schema.parse(TEXT_FILE_SCHEMA), outStream);
- String line;
- while((line = reader.readLine()) != null) {
- ByteBuffer buff = ByteBuffer.wrap(line.getBytes());
- writer.append(buff);
+ ByteBuffer line = ByteBuffer.allocate(128);
+ boolean returnSeen = false;
+ byte[] buf = new byte[8192];
+ for (int end = inStream.read(buf); end != -1; end = inStream.read(buf)) {
+ for (int i = 0; i < end; i++) {
+ int b = buf[i] & 0xFF;
+ if (b == '\n') { // newline
+ System.out.println("Writing line = "+line.position());
+ line.flip();
+ writer.append(line);
+ line.clear();
+ returnSeen = false;
+ } else if (b == '\r') { // return
+ line.flip();
+ writer.append(line);
+ line.clear();
+ returnSeen = true;
+ } else {
+ if (line.position() == line.limit()) { // reallocate longer line
+ ByteBuffer tempLine = ByteBuffer.allocate(line.limit()*2);
+ line.flip();
+ tempLine.put(line);
+ line = tempLine;
+ }
+ line.put((byte)b);
+ returnSeen = false;
+ }
+ }
}
-
- writer.flush();
writer.close();
inStream.close();
return 0;
Modified:
avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java
URL:
http://svn.apache.org/viewvc/avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java?rev=1095495&r1=1095494&r2=1095495&view=diff
==============================================================================
---
avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java
(original)
+++
avro/branches/branch-1.5/lang/java/tools/src/main/java/org/apache/avro/tool/ToTextTool.java
Wed Apr 20 20:00:53 2011
@@ -35,7 +35,7 @@ import org.apache.avro.generic.GenericDa
public class ToTextTool implements Tool {
private static final String TEXT_FILE_SCHEMA =
"\"bytes\"";
- private static final byte[] LINE_SEPERATOR =
+ private static final byte[] LINE_SEPARATOR =
System.getProperty("line.separator").getBytes();
@Override
@@ -45,7 +45,7 @@ public class ToTextTool implements Tool
@Override
public String getShortDescription() {
- return "Converts and avro file to a text file.";
+ return "Converts an Avro data file to a text file.";
}
@Override
@@ -77,7 +77,7 @@ public class ToTextTool implements Tool
while (fileReader.hasNext()) {
ByteBuffer outBuff = (ByteBuffer) fileReader.next();
outStream.write(outBuff.array());
- outStream.write(LINE_SEPERATOR);
+ outStream.write(LINE_SEPARATOR);
}
outStream.close();