Author: cutting
Date: Thu Dec 31 17:01:58 2009
New Revision: 894889
URL: http://svn.apache.org/viewvc?rev=894889&view=rev
Log:
Revised data file format and Java API.
Added:
hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileStream.java
- copied, changed from r891421,
hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
Modified:
hadoop/avro/trunk/CHANGES.txt
hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java
hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileReadTool.java
hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileWriteTool.java
hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestDataFileTools.java
Modified: hadoop/avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Thu Dec 31 17:01:58 2009
@@ -20,6 +20,10 @@
inherited methods in protocols. Finally, Java shorts are
supported as integers. (cutting)
+ AVRO-160. Revised data file format and Java API. Simplified
+ format now permits streaming but no longer supports multiple
+ schemas per file. Java API for reading is iterator-based.
+
NEW FEATURES
AVRO-151. Validating Avro schema parser for C (massie)
Modified: hadoop/avro/trunk/src/doc/content/xdocs/spec.xml
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/doc/content/xdocs/spec.xml?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/doc/content/xdocs/spec.xml (original)
+++ hadoop/avro/trunk/src/doc/content/xdocs/spec.xml Thu Dec 31 17:01:58 2009
@@ -592,66 +592,53 @@
<p>A file consists of:</p>
<ul>
- <li>A <em>header, followed by</em></li>
- <li>one or more <em>blocks</em>.</li>
+ <li>A <em>file header</em>, followed by</li>
+ <li>one or more <em>file data blocks</em>.</li>
</ul>
- <p>There are two kinds of blocks, <em>normal</em>
- and <em>metadata</em>. All files must contain at least one
- metadata block. A file terminates with its last metadata
- block. Any data after the last metadata block is ignored.</p>
- <p>A header consists of:</p>
+ <p>A file header consists of:</p>
<ul>
- <li>Four bytes, ASCII 'O', 'b', 'j', followed by zero.</li>
- <li>A 16-byte sync marker.</li>
+ <li>Four bytes, ASCII 'O', 'b', 'j', followed by 1.</li>
+ <li><em>file metadata</em>, including the schema.</li>
+ <li>The 16-byte, randomly-generated sync marker for this file.</li>
</ul>
- <p>A metadata block consists of:</p>
+ <p>File metadata consists of:</p>
<ul>
- <li>The file's 16-byte sync marker.</li>
- <li>A long with value -1, identifying this as a metadata block.</li>
- <li>A long indicating the size in bytes of this block.</li>
<li>A long indicating the number of metadata key/value pairs.</li>
<li>For each pair, a string key and bytes value.</li>
- <li>The size in bytes of this block as a 4-byte big-endian integer.
- <p>When a file is closed normally, this terminates the file
- and permits one to efficiently seek to the start of the
- metadata. If the sync marker there does not match that at
- the start of the file, then one must scan for the last
- metadata in the file.</p>
- </li>
</ul>
- <p>The following metadata properties are reserved:</p>
+ <p>The following file metadata properties are reserved:</p>
<ul>
<li><strong>schema</strong> contains the schema of objects
- stored in the file, as a string.</li>
- <li><strong>count</strong> contains the number of objects in
- the file as a decimal ASCII string.</li>
+ stored in the file, as JSON data (required).</li>
<li><strong>codec</strong> the name of the compression codec
used to compress blocks, as a string. The only value for codec
currently supported is "null" (meaning no compression is
performed). If codec is absent, it is assumed to be
"null".</li>
- <li><strong>sync</strong> the 16-byte sync marker used in this file,
- as a byte sequence.</li>
</ul>
- <p>A normal block consists of:</p>
+ <p>A file header is thus described by the following schema:</p>
+ <source>
+{"type": "record", "name": "org.apache.avro.file.Header",
+ "fields" : [
+ {"name": "magic", "type": {"type": "fixed", "size": 4}}
+ {"name": "meta", "type": {"type": "map", "values": "bytes"}}
+ {"name": "sync", "type": {"type": "fixed", "size": 16}}
+ ]
+}
+ </source>
+
+ <p>A file data block consists of:</p>
<ul>
- <li>The file's 16-byte sync marker.</li>
- <li>A long indicating the size in bytes of this block in the file.</li>
+ <li>A long indicating the count of objects in this block.</li>
<li>The serialized objects. If a codec is specified, this is
compressed by that codec.</li>
+ <li>The file's 16-byte sync marker.</li>
</ul>
- <p>Note that this format supports appends, since multiple
- metadata blocks are permitted.</p>
-
- <p>To be robust to application failure, implementations can
- write metadata periodically to limit the amount of the file that
- must be scanned to find the last metadata block.</p>
-
</section>
<section>
Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileConstants.java Thu
Dec 31 17:01:58 2009
@@ -22,7 +22,7 @@
* Constants used in data files.
*/
class DataFileConstants {
- public static final byte VERSION = 0;
+ public static final byte VERSION = 1;
public static final byte[] MAGIC = new byte[] {
(byte)'O', (byte)'b', (byte)'j', VERSION
};
@@ -31,7 +31,6 @@
public static final int SYNC_INTERVAL = 1000*SYNC_SIZE;
public static final String SCHEMA = "schema";
- public static final String SYNC = "sync";
public static final String COUNT = "count";
public static final String CODEC = "codec";
public static final String NULL_CODEC = "null";
Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java Thu Dec
31 17:01:58 2009
@@ -19,35 +19,19 @@
import java.io.BufferedInputStream;
import java.io.IOException;
+import java.io.EOFException;
import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.Decoder;
-import org.apache.avro.io.BinaryDecoder;
+import static org.apache.avro.file.DataFileConstants.SYNC_SIZE;
-/** Read files written by {...@link DataFileWriter}.
+/** Random access to files written with {...@link DataFileWriter}.
* @see DataFileWriter
*/
-public class DataFileReader<D> {
-
- private Schema schema;
- private DatumReader<D> reader;
- private SeekableBufferedInput in;
- private Decoder vin;
-
- Map<String,byte[]> meta = new HashMap<String,byte[]>();
-
- private long count; // # entries in file
- private long blockCount; // # entries in block
- byte[] sync = new byte[DataFileConstants.SYNC_SIZE];
- private byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
+public class DataFileReader<D> extends DataFileStream<D> {
+ private SeekableBufferedInput sin;
+ private long blockStart;
/** Construct a reader for a file. */
public DataFileReader(File file, DatumReader<D> reader) throws IOException {
@@ -57,133 +41,59 @@
/** Construct a reader for a file. */
public DataFileReader(SeekableInput sin, DatumReader<D> reader)
throws IOException {
- this.in = new SeekableBufferedInput(sin);
-
- byte[] magic = new byte[4];
- in.read(magic);
- if (!Arrays.equals(DataFileConstants.MAGIC, magic))
- throw new IOException("Not a data file.");
-
- long length = in.length();
- in.seek(length-4);
- int footerSize=(in.read()<<24)+(in.read()<<16)+(in.read()<<8)+in.read();
- in.seek(length-footerSize);
- this.vin = new BinaryDecoder(in);
- long l = vin.readMapStart();
- if (l > 0) {
- do {
- for (long i = 0; i < l; i++) {
- String key = vin.readString(null).toString();
- ByteBuffer value = vin.readBytes(null);
- byte[] bb = new byte[value.remaining()];
- value.get(bb);
- meta.put(key, bb);
- }
- } while ((l = vin.mapNext()) != 0);
- }
-
- this.sync = getMeta(DataFileConstants.SYNC);
- this.count = getMetaLong(DataFileConstants.COUNT);
- String codec = getMetaString(DataFileConstants.CODEC);
- if (codec != null && ! codec.equals(DataFileConstants.NULL_CODEC)) {
- throw new IOException("Unknown codec: " + codec);
- }
- this.schema = Schema.parse(getMetaString(DataFileConstants.SCHEMA));
- this.reader = reader;
-
- reader.setSchema(schema);
-
- in.seek(DataFileConstants.MAGIC.length); // seek to start
- }
-
- /** Return the schema used in this file. */
- public Schema getSchema() { return schema; }
-
- /** Return the number of records in the file. */
- public long getCount() { return count; }
-
- /** Return the value of a metadata property. */
- public synchronized byte[] getMeta(String key) {
- return meta.get(key);
- }
- /** Return the value of a metadata property. */
- public synchronized String getMetaString(String key) {
- byte[] value = getMeta(key);
- if (value == null) {
- return null;
- }
- try {
- return new String(value, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
- /** Return the value of a metadata property. */
- public synchronized long getMetaLong(String key) {
- return Long.parseLong(getMetaString(key));
+ super(new SeekableBufferedInput(sin), reader);
+ this.sin = (SeekableBufferedInput)in;
}
- /** Return the next datum in the file. */
- public synchronized D next(D reuse) throws IOException {
- while (blockCount == 0) { // at start of block
-
- if (in.tell() == in.length()) // at eof
- return null;
-
- skipSync(); // skip a sync
-
- blockCount = vin.readLong(); // read blockCount
-
- if (blockCount == DataFileConstants.FOOTER_BLOCK) {
- in.seek(vin.readLong()+in.tell()); // skip a footer
- blockCount = 0;
- }
- }
- blockCount--;
- return reader.read(reuse, vin);
- }
-
- private void skipSync() throws IOException {
- vin.readFixed(syncBuffer);
- if (!Arrays.equals(syncBuffer, sync))
- throw new IOException("Invalid sync!");
- }
-
- /** Move to the specified synchronization point, as returned by {...@link
- * DataFileWriter#sync()}. */
- public synchronized void seek(long position) throws IOException {
- in.seek(position);
- blockCount = 0;
- }
-
- /** Move to the next synchronization point after a position. */
- public synchronized void sync(long position) throws IOException {
- if (in.tell()+DataFileConstants.SYNC_SIZE >= in.length()) {
- in.seek(in.length());
+ /** Move to a specific, known synchronization point, one returned from
{...@link
+ * DataFileWriter#sync()} while writing. If synchronization points were not
+ * saved while writing a file, use {#sync(long)} instead. */
+ public void seek(long position) throws IOException {
+ sin.seek(position);
+ blockRemaining = 0;
+ blockStart = position;
+ }
+
+ /** Move to the next synchronization point after a position. To process a
+ * range of file entires, call this with the starting position, then check
+ * {...@link #pastSync(long)} with the end point before each call to
{...@link
+ * #next()}. */
+ public void sync(long position) throws IOException {
+ seek(position);
+ try {
+ vin.readFixed(syncBuffer);
+ } catch (EOFException e) {
+ blockStart = sin.tell();
return;
}
- in.seek(position);
- vin.readFixed(syncBuffer);
- for (int i = 0; in.tell() < in.length(); i++) {
+ int i=0, b;
+ do {
int j = 0;
- for (; j < sync.length; j++) {
- if (sync[j] != syncBuffer[(i+j)%sync.length])
+ for (; j < SYNC_SIZE; j++) {
+ if (sync[j] != syncBuffer[(i+j)%SYNC_SIZE])
break;
}
- if (j == sync.length) { // position before sync
- in.seek(in.tell() - DataFileConstants.SYNC_SIZE);
+ if (j == SYNC_SIZE) { // matched a complete sync
+ blockStart = position + i + SYNC_SIZE;
return;
}
- syncBuffer[i%sync.length] = (byte)in.read();
- }
+ b = in.read();
+ syncBuffer[i++%SYNC_SIZE] = (byte)b;
+ } while (b != -1);
+ }
+
+ @Override
+ void skipSync() throws IOException { // note block start
+ super.skipSync();
+ blockStart = sin.tell();
}
- /** Close this reader. */
- public synchronized void close() throws IOException {
- in.close();
+ /** Return true if past the next synchronization point after a position. */
+ public boolean pastSync(long position) {
+ return blockStart >= Math.min(sin.length(), position+SYNC_SIZE);
}
- private class SeekableBufferedInput extends BufferedInputStream {
+ private static class SeekableBufferedInput extends BufferedInputStream {
private long position; // end of buffer
private long length; // file length
@@ -220,7 +130,7 @@
}
public long tell() { return position-(count-pos); }
- public long length() throws IOException { return length; }
+ public long length() { return length; }
public int read() throws IOException { // optimized implementation
if (pos >= count) return super.read();
Copied: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileStream.java
(from r891421,
hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java)
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileStream.java?p2=hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileStream.java&p1=hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java&r1=891421&r2=894889&rev=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileReader.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileStream.java Thu Dec
31 17:01:58 2009
@@ -17,59 +17,59 @@
*/
package org.apache.avro.file;
-import java.io.BufferedInputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
-import java.io.File;
import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
+import org.apache.avro.AvroRuntimeException;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.BinaryDecoder;
-/** Read files written by {...@link DataFileWriter}.
+/** Streaming access to files written by {...@link DataFileWriter}. Use
{...@link
+ * DataFileReader} for file-based input.
* @see DataFileWriter
*/
-public class DataFileReader<D> {
+public class DataFileStream<D> implements Iterator<D>, Iterable<D> {
private Schema schema;
private DatumReader<D> reader;
- private SeekableBufferedInput in;
- private Decoder vin;
+
+ final InputStream in;
+ final Decoder vin;
Map<String,byte[]> meta = new HashMap<String,byte[]>();
- private long count; // # entries in file
- private long blockCount; // # entries in block
+ long blockRemaining; // # entries remaining in block
byte[] sync = new byte[DataFileConstants.SYNC_SIZE];
- private byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
-
- /** Construct a reader for a file. */
- public DataFileReader(File file, DatumReader<D> reader) throws IOException {
- this(new SeekableFileInput(file), reader);
- }
+ byte[] syncBuffer = new byte[DataFileConstants.SYNC_SIZE];
- /** Construct a reader for a file. */
- public DataFileReader(SeekableInput sin, DatumReader<D> reader)
+ /** Construct a reader for an input stream. For file-based input, use
{...@link
+ * DataFileReader}. This performs no buffering, for good performance, be
+ * sure to pass in a {...@link java.io.BufferedInputStream}. */
+ public DataFileStream(InputStream in, DatumReader<D> reader)
throws IOException {
- this.in = new SeekableBufferedInput(sin);
+ this.in = in;
+ this.vin = new BinaryDecoder(in);
- byte[] magic = new byte[4];
- in.read(magic);
+ byte[] magic = new byte[DataFileConstants.MAGIC.length];
+ try {
+ vin.readFixed(magic); // read magic
+ } catch (IOException e) {
+ throw new IOException("Not a data file.");
+ }
if (!Arrays.equals(DataFileConstants.MAGIC, magic))
throw new IOException("Not a data file.");
- long length = in.length();
- in.seek(length-4);
- int footerSize=(in.read()<<24)+(in.read()<<16)+(in.read()<<8)+in.read();
- in.seek(length-footerSize);
- this.vin = new BinaryDecoder(in);
- long l = vin.readMapStart();
+ long l = vin.readMapStart(); // read meta data
if (l > 0) {
do {
for (long i = 0; i < l; i++) {
@@ -81,9 +81,8 @@
}
} while ((l = vin.mapNext()) != 0);
}
+ vin.readFixed(sync); // read sync
- this.sync = getMeta(DataFileConstants.SYNC);
- this.count = getMetaLong(DataFileConstants.COUNT);
String codec = getMetaString(DataFileConstants.CODEC);
if (codec != null && ! codec.equals(DataFileConstants.NULL_CODEC)) {
throw new IOException("Unknown codec: " + codec);
@@ -92,22 +91,17 @@
this.reader = reader;
reader.setSchema(schema);
-
- in.seek(DataFileConstants.MAGIC.length); // seek to start
}
/** Return the schema used in this file. */
public Schema getSchema() { return schema; }
- /** Return the number of records in the file. */
- public long getCount() { return count; }
-
/** Return the value of a metadata property. */
- public synchronized byte[] getMeta(String key) {
+ public byte[] getMeta(String key) {
return meta.get(key);
}
/** Return the value of a metadata property. */
- public synchronized String getMetaString(String key) {
+ public String getMetaString(String key) {
byte[] value = getMeta(key);
if (value == null) {
return null;
@@ -119,121 +113,65 @@
}
}
/** Return the value of a metadata property. */
- public synchronized long getMetaLong(String key) {
+ public long getMetaLong(String key) {
return Long.parseLong(getMetaString(key));
}
- /** Return the next datum in the file. */
- public synchronized D next(D reuse) throws IOException {
- while (blockCount == 0) { // at start of block
+ /** Returns an iterator over entries in this file. Note that this iterator
+ * is shared with other users of the file: it does not contain a separate
+ * pointer into the file. */
+ public Iterator<D> iterator() { return this; }
- if (in.tell() == in.length()) // at eof
- return null;
-
- skipSync(); // skip a sync
-
- blockCount = vin.readLong(); // read blockCount
-
- if (blockCount == DataFileConstants.FOOTER_BLOCK) {
- in.seek(vin.readLong()+in.tell()); // skip a footer
- blockCount = 0;
- }
+ /** True if more entries remain in this file. */
+ public boolean hasNext() {
+ try {
+ if (blockRemaining == 0)
+ blockRemaining = vin.readLong(); // read block count
+ return blockRemaining != 0;
+ } catch (EOFException e) { // at EOF
+ return false;
+ } catch (IOException e) {
+ throw new AvroRuntimeException(e);
}
- blockCount--;
- return reader.read(reuse, vin);
}
- private void skipSync() throws IOException {
- vin.readFixed(syncBuffer);
- if (!Arrays.equals(syncBuffer, sync))
- throw new IOException("Invalid sync!");
+ /** Read the next datum in the file.
+ * @throws NoSuchElementException if no more remain in the file.
+ */
+ public D next() {
+ try {
+ return next(null);
+ } catch (IOException e) {
+ throw new AvroRuntimeException(e);
+ }
}
- /** Move to the specified synchronization point, as returned by {...@link
- * DataFileWriter#sync()}. */
- public synchronized void seek(long position) throws IOException {
- in.seek(position);
- blockCount = 0;
+ /** Read the next datum from the file.
+ * @param reuse an instance to reuse.
+ * @throws NoSuchElementException if no more remain in the file.
+ */
+ public D next(D reuse) throws IOException {
+ if (!hasNext())
+ throw new NoSuchElementException();
+ D result = reader.read(reuse, vin);
+ if (--blockRemaining == 0)
+ skipSync();
+ return result;
}
- /** Move to the next synchronization point after a position. */
- public synchronized void sync(long position) throws IOException {
- if (in.tell()+DataFileConstants.SYNC_SIZE >= in.length()) {
- in.seek(in.length());
- return;
- }
- in.seek(position);
+ void skipSync() throws IOException {
vin.readFixed(syncBuffer);
- for (int i = 0; in.tell() < in.length(); i++) {
- int j = 0;
- for (; j < sync.length; j++) {
- if (sync[j] != syncBuffer[(i+j)%sync.length])
- break;
- }
- if (j == sync.length) { // position before sync
- in.seek(in.tell() - DataFileConstants.SYNC_SIZE);
- return;
- }
- syncBuffer[i%sync.length] = (byte)in.read();
- }
+ if (!Arrays.equals(syncBuffer, sync))
+ throw new IOException("Invalid sync!");
}
+ /** Not supported. */
+ public void remove() { throw new UnsupportedOperationException(); }
+
/** Close this reader. */
- public synchronized void close() throws IOException {
+ public void close() throws IOException {
in.close();
}
- private class SeekableBufferedInput extends BufferedInputStream {
- private long position; // end of buffer
- private long length; // file length
-
- private class PositionFilter extends InputStream {
- private SeekableInput in;
- public PositionFilter(SeekableInput in) throws IOException {
- this.in = in;
- }
- public int read() { throw new UnsupportedOperationException(); }
- public int read(byte[] b, int off, int len) throws IOException {
- int value = in.read(b, off, len);
- if (value > 0) position += value; // update on read
- return value;
- }
- }
-
- public SeekableBufferedInput(SeekableInput in) throws IOException {
- super(null);
- this.in = new PositionFilter(in);
- this.length = in.length();
- }
-
- public void seek(long p) throws IOException {
- if (p < 0) throw new IOException("Illegal seek: "+p);
- long start = position - count;
- if (p >= start && p < position) { // in buffer
- this.pos = (int)(p - start);
- } else { // not in buffer
- this.pos = 0;
- this.count = 0;
- ((PositionFilter)in).in.seek(p);
- this.position = p;
- }
- }
-
- public long tell() { return position-(count-pos); }
- public long length() throws IOException { return length; }
-
- public int read() throws IOException { // optimized implementation
- if (pos >= count) return super.read();
- return buf[pos++] & 0xff;
- }
-
- public long skip(long skip) throws IOException { // optimized
implementation
- if (skip > count-pos)
- return super.skip(skip);
- pos += (int)skip;
- return skip;
- }
- }
-
}
Modified: hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/file/DataFileWriter.java Thu Dec
31 17:01:58 2009
@@ -36,7 +36,6 @@
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
-import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.AvroRuntimeException;
@@ -60,9 +59,8 @@
private BufferedFileOutputStream out;
private Encoder vout;
- private Map<String,byte[]> meta = new HashMap<String,byte[]>();
+ private final Map<String,byte[]> meta = new HashMap<String,byte[]>();
- private long count; // # entries in file
private int blockCount; // # entries in current block
private ByteArrayOutputStream buffer =
@@ -71,29 +69,55 @@
private byte[] sync; // 16 random bytes
- /** Construct a writer to a new file for data matching a schema. */
- public DataFileWriter(Schema schema, File file,
- DatumWriter<D> dout) throws IOException {
- this(schema, new FileOutputStream(file), dout);
- }
- /** Construct a writer to a new file for data matching a schema. */
- public DataFileWriter(Schema schema, OutputStream outs,
- DatumWriter<D> dout) throws IOException {
+ private boolean isOpen;
+
+ /** Construct a writer, not yet open. */
+ public DataFileWriter(DatumWriter<D> dout) {
+ this.dout = dout;
+ }
+
+ private void assertOpen() {
+ if (!isOpen) throw new AvroRuntimeException("not open");
+ }
+ private void assertNotOpen() {
+ if (isOpen) throw new AvroRuntimeException("already open");
+ }
+
+ /** Open a new file for data matching a schema. */
+ public DataFileWriter<D> create(Schema schema, File file) throws IOException
{
+ return create(schema, new FileOutputStream(file));
+ }
+
+ /** Open a new file for data matching a schema. */
+ public DataFileWriter<D> create(Schema schema, OutputStream outs)
+ throws IOException {
+ assertNotOpen();
+
this.schema = schema;
+ setMeta(DataFileConstants.SCHEMA, schema.toString());
this.sync = generateSync();
- setMeta(DataFileConstants.SYNC, sync);
- setMeta(DataFileConstants.SCHEMA, schema.toString());
- setMeta(DataFileConstants.CODEC, DataFileConstants.NULL_CODEC);
+ init(outs);
+
+ out.write(DataFileConstants.MAGIC); // write magic
+
+ vout.writeMapStart(); // write metadata
+ vout.setItemCount(meta.size());
+ for (Map.Entry<String,byte[]> entry : meta.entrySet()) {
+ vout.startItem();
+ vout.writeString(entry.getKey());
+ vout.writeBytes(entry.getValue());
+ }
+ vout.writeMapEnd();
- init(outs, dout);
+ out.write(sync); // write initial sync
- out.write(DataFileConstants.MAGIC);
+ return this;
}
-
- /** Construct a writer appending to an existing file. */
- public DataFileWriter(File file, DatumWriter<D> dout)
- throws IOException {
+
+ /** Open a writer appending to an existing file. */
+ public DataFileWriter<D> appendTo(File file) throws IOException {
+ assertNotOpen();
if (!file.exists())
throw new FileNotFoundException("Not found: "+file);
RandomAccessFile raf = new RandomAccessFile(file, "rw");
@@ -103,21 +127,21 @@
new GenericDatumReader<D>());
this.schema = reader.getSchema();
this.sync = reader.sync;
- this.count = reader.getCount();
this.meta.putAll(reader.meta);
FileChannel channel = raf.getChannel(); // seek to end
channel.position(channel.size());
- init(new FileOutputStream(fd), dout);
+ init(new FileOutputStream(fd));
+
+ return this;
}
-
- private void init(OutputStream outs, DatumWriter<D> dout)
- throws IOException {
+
+ private void init(OutputStream outs) throws IOException {
this.out = new BufferedFileOutputStream(outs);
this.vout = new BinaryEncoder(out);
- this.dout = dout;
dout.setSchema(schema);
+ this.isOpen = true;
}
private static byte[] generateSync() {
@@ -132,92 +156,63 @@
}
/** Set a metadata property. */
- public synchronized void setMeta(String key, byte[] value) {
- meta.put(key, value);
- }
+ public DataFileWriter<D> setMeta(String key, byte[] value) {
+ assertNotOpen();
+ meta.put(key, value);
+ return this;
+ }
/** Set a metadata property. */
- public synchronized void setMeta(String key, String value) {
- try {
- setMeta(key, value.getBytes("UTF-8"));
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
+ public DataFileWriter<D> setMeta(String key, String value) {
+ try {
+ return setMeta(key, value.getBytes("UTF-8"));
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
}
+ }
/** Set a metadata property. */
- public synchronized void setMeta(String key, long value) {
- setMeta(key, Long.toString(value));
- }
-
- /** If the schema for this file is a union, add a branch to it. */
- public synchronized void addSchema(Schema branch) {
- if (schema.getType() != Schema.Type.UNION)
- throw new AvroRuntimeException("Not a union schema: "+schema);
- List<Schema> types = schema.getTypes();
- types.add(branch);
- this.schema = Schema.createUnion(types);
- this.dout.setSchema(schema);
- setMeta(DataFileConstants.SCHEMA, schema.toString());
+ public DataFileWriter<D> setMeta(String key, long value) {
+ return setMeta(key, Long.toString(value));
}
/** Append a datum to the file. */
- public synchronized void append(D datum) throws IOException {
- dout.write(datum, bufOut);
- blockCount++;
- count++;
- if (buffer.size() >= DataFileConstants.SYNC_INTERVAL)
- writeBlock();
- }
+ public void append(D datum) throws IOException {
+ assertOpen();
+ dout.write(datum, bufOut);
+ blockCount++;
+ if (buffer.size() >= DataFileConstants.SYNC_INTERVAL)
+ writeBlock();
+ }
private void writeBlock() throws IOException {
if (blockCount > 0) {
- out.write(sync);
vout.writeLong(blockCount);
buffer.writeTo(out);
buffer.reset();
blockCount = 0;
+ out.write(sync);
}
}
/** Return the current position as a value that may be passed to {...@link
* DataFileReader#seek(long)}. Forces the end of the current block,
* emitting a synchronization marker. */
- public synchronized long sync() throws IOException {
- writeBlock();
- return out.tell();
- }
+ public long sync() throws IOException {
+ assertOpen();
+ writeBlock();
+ return out.tell();
+ }
- /** Flush the current state of the file, including metadata. */
- public synchronized void flush() throws IOException {
- writeFooter();
- out.flush();
- }
+ /** Flush the current state of the file. */
+ public void flush() throws IOException {
+ sync();
+ out.flush();
+ }
/** Close the file. */
- public synchronized void close() throws IOException {
- flush();
- out.close();
- }
-
- private void writeFooter() throws IOException {
- writeBlock(); // flush any data
- setMeta(DataFileConstants.COUNT, count); // update count
- bufOut.writeMapStart(); // write meta entries
- bufOut.setItemCount(meta.size());
- for (Map.Entry<String,byte[]> entry : meta.entrySet()) {
- bufOut.startItem();
- bufOut.writeString(entry.getKey());
- bufOut.writeBytes(entry.getValue());
- }
- bufOut.writeMapEnd();
-
- int size = buffer.size()+4;
- out.write(sync);
- vout.writeLong(DataFileConstants.FOOTER_BLOCK); // tag the
block
- vout.writeLong(size);
- buffer.writeTo(out);
- buffer.reset();
- out.write((byte)(size >>> 24)); out.write((byte)(size >>> 16));
- out.write((byte)(size >>> 8)); out.write((byte)(size >>> 0));
+ public void close() throws IOException {
+ flush();
+ out.close();
+ isOpen = false;
}
private class BufferedFileOutputStream extends BufferedOutputStream {
Modified: hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileReadTool.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileReadTool.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileReadTool.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileReadTool.java Thu
Dec 31 17:01:58 2009
@@ -61,8 +61,7 @@
Schema schema = fileReader.getSchema();
DatumWriter<Object> writer = new GenericDatumWriter<Object>(schema);
Encoder encoder = new JsonEncoder(schema, (JsonGenerator)null);
- Object datum;
- while (null != (datum = fileReader.next(null))) {
+ for (Object datum : fileReader) {
// init() recreates the internal Jackson JsonGenerator
encoder.init(out);
writer.write(datum, encoder);
Modified: hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileWriteTool.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileWriteTool.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileWriteTool.java
(original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/tool/DataFileWriteTool.java Thu
Dec 31 17:01:58 2009
@@ -59,8 +59,8 @@
try {
DataInputStream din = new DataInputStream(input);
DataFileWriter<Object> writer =
- new DataFileWriter<Object>(schema, out,
- new GenericDatumWriter<Object>());
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+ .create(schema, out);
Decoder decoder = new JsonDecoder(schema, din);
Object datum;
while (true) {
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/RandomData.java Thu Dec 31
17:01:58 2009
@@ -18,7 +18,6 @@
package org.apache.avro;
import java.io.File;
-import java.io.FileOutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Iterator;
@@ -129,9 +128,8 @@
}
Schema sch = Schema.parse(new File(args[0]));
DataFileWriter<Object> writer =
- new DataFileWriter<Object>(sch,
- new FileOutputStream(new File(args[1]),false),
- new GenericDatumWriter<Object>());
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+ .create(sch, new File(args[1]));
try {
for (Object datum : new RandomData(sch, Integer.parseInt(args[2]))) {
writer.append(datum);
Modified: hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java (original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFile.java Thu Dec
31 17:01:58 2009
@@ -28,6 +28,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import java.util.Random;
import java.io.File;
import java.io.IOException;
@@ -52,11 +53,14 @@
@Test
public void testGenericWrite() throws IOException {
DataFileWriter<Object> writer =
- new DataFileWriter<Object>(SCHEMA, FILE,
- new GenericDatumWriter<Object>());
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+ .create(SCHEMA, FILE);
try {
+ int count = 0;
for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) {
writer.append(datum);
+ if (++count%(COUNT/3) == 0)
+ writer.sync(); // force some syncs mid-file
}
} finally {
writer.close();
@@ -68,7 +72,6 @@
DataFileReader<Object> reader =
new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
try {
- assertEquals(COUNT, reader.getCount());
Object datum = null;
if (VALIDATE) {
for (Object expected : new RandomData(SCHEMA, COUNT, SEED)) {
@@ -86,10 +89,38 @@
}
@Test
+ public void testSplits() throws IOException {
+ DataFileReader<Object> reader =
+ new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
+ Random rand = new Random(SEED);
+ try {
+ int splits = 10; // number of splits
+ int length = (int)FILE.length(); // length of file
+ int end = length; // end of split
+ int remaining = end; // bytes remaining
+ int count = 0; // count of entries
+ while (remaining > 0) {
+ int start = Math.max(0, end - rand.nextInt(2*length/splits));
+ reader.sync(start); // count entries in split
+ while (!reader.pastSync(end)) {
+ reader.next();
+ count++;
+ }
+ remaining -= end-start;
+ end = start;
+ }
+ assertEquals(COUNT, count);
+ } finally {
+ reader.close();
+ }
+ }
+
+ @Test
public void testGenericAppend() throws IOException {
long start = FILE.length();
DataFileWriter<Object> writer =
- new DataFileWriter<Object>(FILE, new GenericDatumWriter<Object>());
+ new DataFileWriter<Object>(new GenericDatumWriter<Object>())
+ .appendTo(FILE);
try {
for (Object datum : new RandomData(SCHEMA, COUNT, SEED+1)) {
writer.append(datum);
@@ -100,7 +131,6 @@
DataFileReader<Object> reader =
new DataFileReader<Object>(FILE, new GenericDatumReader<Object>());
try {
- assertEquals(COUNT*2, reader.getCount());
reader.seek(start);
Object datum = null;
if (VALIDATE) {
@@ -118,18 +148,12 @@
}
}
-
-
- protected void readFile(File f,
- DatumReader<Object> datumReader, boolean reuse)
+ protected void readFile(File f, DatumReader<Object> datumReader)
throws IOException {
System.out.println("Reading "+ f.getName());
DataFileReader<Object> reader =
new DataFileReader<Object>(new SeekableFileInput(f), datumReader);
- Object datum = null;
- long count = reader.getMetaLong("count");
- for (int i = 0; i < count; i++) {
- datum = reader.next(reuse ? datum : null);
+ for (Object datum : reader) {
assertNotNull(datum);
}
}
@@ -140,10 +164,10 @@
if (args.length > 1)
projection = Schema.parse(new File(args[1]));
TestDataFile tester = new TestDataFile();
- tester.readFile(input, new GenericDatumReader<Object>(null, projection),
false);
+ tester.readFile(input, new GenericDatumReader<Object>(null, projection));
long start = System.currentTimeMillis();
for (int i = 0; i < 4; i++)
- tester.readFile(input, new GenericDatumReader<Object>(null, projection),
false);
+ tester.readFile(input, new GenericDatumReader<Object>(null, projection));
System.out.println("Time: "+(System.currentTimeMillis()-start));
}
@@ -172,7 +196,7 @@
private void readFiles(DatumReader<Object> datumReader) throws IOException
{
TestDataFile test = new TestDataFile();
for (File f : DATAFILE_DIR.listFiles())
- test.readFile(f, datumReader, true);
+ test.readFile(f, datumReader);
}
}
}
Modified:
hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
(original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestDataFileReflect.java
Thu Dec 31 17:01:58 2009
@@ -52,8 +52,9 @@
reflectData.getSchema(FooRecord.class),
reflectData.getSchema(BarRecord.class) });
Schema union = Schema.createUnion(schemas);
- DataFileWriter<Object> writer = new DataFileWriter<Object>(union, fos,
- new ReflectDatumWriter(union));
+ DataFileWriter<Object> writer =
+ new DataFileWriter<Object>(new ReflectDatumWriter(union))
+ .create(union, fos);
// test writing to a file
CheckList check = new CheckList();
@@ -66,59 +67,10 @@
ReflectDatumReader din = new ReflectDatumReader();
SeekableFileInput sin = new SeekableFileInput(FILE);
DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
- Object datum = null;
- long count = reader.getMetaLong("count");
- for (int i = 0; i < count; i++) {
- datum = reader.next(datum);
- check.assertEquals(datum, i);
- }
- reader.close();
- }
-
- /*
- * Test that using multiple schemas in a file works doing a union for new
- * types as they come.
- */
- @Test
- public void testMultiReflectWithUntionAfterWriting() throws IOException {
- FileOutputStream fos = new FileOutputStream(FILE);
-
- ReflectData reflectData = ReflectData.get();
- List<Schema> schemas = new ArrayList<Schema>();
- schemas.add(reflectData.getSchema(FooRecord.class));
- Schema union = Schema.createUnion(schemas);
- DataFileWriter<Object> writer = new DataFileWriter<Object>(union, fos,
- new ReflectDatumWriter(union));
-
- CheckList check = new CheckList();
- // write known type
- write(writer, new FooRecord(10), check);
- write(writer, new FooRecord(15), check);
-
- // we have a new type, add it to the file
- writer.addSchema(reflectData.getSchema(BarRecord.class));
-
- // test writing those new types to a file
- write(writer, new BarRecord("One beer please"), check);
- write(writer, new BarRecord("Two beers please"), check);
-
- // does foo record still work?
- write(writer, new FooRecord(20), check);
-
- // get one more bar in, just for laughs
- write(writer, new BarRecord("Many beers please"), check);
-
- writer.close();
-
- ReflectDatumReader din = new ReflectDatumReader();
- SeekableFileInput sin = new SeekableFileInput(FILE);
- DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
- Object datum = null;
- long count = reader.getMetaLong("count");
- for (int i = 0; i < count; i++) {
- datum = reader.next(datum);
- check.assertEquals(datum, i);
- }
+ int count = 0;
+ for (Object datum : reader)
+ check.assertEquals(datum, count++);
+ Assert.assertEquals(count, check.size());
reader.close();
}
@@ -131,8 +83,9 @@
ReflectData reflectData = ReflectData.AllowNull.get();
Schema schema = reflectData.getSchema(BarRecord.class);
- DataFileWriter<Object> writer = new DataFileWriter<Object>(schema, fos,
- new ReflectDatumWriter(BarRecord.class, reflectData));
+ DataFileWriter<Object> writer = new DataFileWriter<Object>
+ (new ReflectDatumWriter(BarRecord.class, reflectData))
+ .create(schema, fos);
// test writing to a file
CheckList check = new CheckList();
@@ -145,12 +98,10 @@
ReflectDatumReader din = new ReflectDatumReader();
SeekableFileInput sin = new SeekableFileInput(FILE);
DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
- Object datum = null;
- long count = reader.getMetaLong("count");
- for (int i = 0; i < count; i++) {
- datum = reader.next(datum);
- check.assertEquals(datum, i);
- }
+ int count = 0;
+ for (Object datum : reader)
+ check.assertEquals(datum, count++);
+ Assert.assertEquals(count, check.size());
reader.close();
}
@@ -162,8 +113,9 @@
FileOutputStream fos = new FileOutputStream(FILE);
Schema schema = ReflectData.get().getSchema(BazRecord.class);
- DataFileWriter<Object> writer = new DataFileWriter<Object>(schema, fos,
- new ReflectDatumWriter(schema));
+ DataFileWriter<Object> writer =
+ new DataFileWriter<Object>(new ReflectDatumWriter(schema))
+ .create(schema, fos);
// test writing to a file
CheckList check = new CheckList();
@@ -174,12 +126,10 @@
ReflectDatumReader din = new ReflectDatumReader();
SeekableFileInput sin = new SeekableFileInput(FILE);
DataFileReader<Object> reader = new DataFileReader<Object>(sin, din);
- Object datum = null;
- long count = reader.getMetaLong("count");
- for (int i = 0; i < count; i++) {
- datum = reader.next(datum);
- check.assertEquals(datum, i);
- }
+ int count = 0;
+ for (Object datum : reader)
+ check.assertEquals(datum, count++);
+ Assert.assertEquals(count, check.size());
reader.close();
}
Modified:
hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestDataFileTools.java
URL:
http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestDataFileTools.java?rev=894889&r1=894888&r2=894889&view=diff
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestDataFileTools.java
(original)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/tool/TestDataFileTools.java
Thu Dec 31 17:01:58 2009
@@ -49,8 +49,9 @@
sampleFile = AvroTestUtil.tempFile(TestDataFileTools.class + ".avro");
schema = Schema.create(Type.INT);
- DataFileWriter<Object> writer = new DataFileWriter<Object>(
- schema, sampleFile, new GenericDatumWriter<Object>(schema));
+ DataFileWriter<Object> writer
+ = new DataFileWriter<Object>(new GenericDatumWriter<Object>(schema))
+ .create(schema, sampleFile);
StringBuilder builder = new StringBuilder();
for (int i = 0; i < COUNT; ++i) {
@@ -106,15 +107,13 @@
// Read it back, and make sure it's valid.
GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
DataFileReader<Object> fileReader = new
DataFileReader<Object>(outFile,reader);
- Object datum;
int i = 0;
- while (null != (datum = fileReader.next(null))) {
+ for (Object datum : fileReader) {
assertEquals(i, datum);
i++;
}
assertEquals(COUNT, i);
assertEquals(schema, fileReader.getSchema());
- assertEquals(COUNT, fileReader.getCount());
}
@Test
@@ -148,7 +147,7 @@
DataFileReader<Object> fileReader =
new DataFileReader<Object>(outFile,reader);
int i = 0;
- while (null != fileReader.next(null)) {
+ for (Object datum : fileReader) {
i++;
}
return i;