Author: cutting
Date: Thu May 17 19:50:49 2012
New Revision: 1339825
URL: http://svn.apache.org/viewvc?rev=1339825&view=rev
Log:
AVRO-1081. Java: Fix to be able to write ByteBuffers that have no backing
array. Also fix reflection to correctly read ByteBuffer fields.
Added:
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java
(with props)
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu May 17 19:50:49 2012
@@ -58,6 +58,10 @@ Avro 1.7.0 (unreleased)
AVRO-1065. NodeRecord::isValid() treats records with no fields as
invalid. (thiru)
+ AVRO-1081. Java: Fix to be able to write ByteBuffers that have no
+ backing array. Also fix reflection to correctly read ByteBuffer
+ fields. (cutting)
+
Avro 1.6.3 (5 March 2012)
AVRO-1077. Missing 'inline' for union set function. (thiru)
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java
Thu May 17 19:50:49 2012
@@ -279,8 +279,7 @@ public class DataFileWriter<D> implement
* Appending non-conforming data may result in an unreadable file. */
public void appendEncoded(ByteBuffer datum) throws IOException {
assertOpen();
- int start = datum.position();
- bufOut.writeFixed(datum.array(), start, datum.limit()-start);
+ bufOut.writeFixed(datum);
blockCount++;
writeIfBlockFull();
}
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
Thu May 17 19:50:49 2012
@@ -148,7 +148,7 @@ public class GenericDatumReader<D> imple
case UNION: return read(old, expected.getTypes().get(in.readIndex()),
in);
case FIXED: return readFixed(old, expected, in);
case STRING: return readString(old, expected, in);
- case BYTES: return readBytes(old, in);
+ case BYTES: return readBytes(old, expected, in);
case INT: return readInt(old, expected, in);
case LONG: return in.readLong();
case FLOAT: return in.readFloat();
@@ -344,6 +344,14 @@ public class GenericDatumReader<D> imple
/** Called to read byte arrays. Subclasses may override to use a different
* byte array representation. By default, this calls {@link
* Decoder#readBytes(ByteBuffer)}.*/
+ protected Object readBytes(Object old, Schema s, Decoder in)
+ throws IOException {
+ return readBytes(old, in);
+ }
+
+ /** Called to read byte arrays. Subclasses may override to use a different
+ * byte array representation. By default, this calls {@link
+ * Decoder#readBytes(ByteBuffer)}.*/
protected Object readBytes(Object old, Decoder in) throws IOException {
return in.readBytes(old instanceof ByteBuffer ? (ByteBuffer) old : null);
}
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BinaryEncoder.java
Thu May 17 19:50:49 2012
@@ -57,10 +57,13 @@ public abstract class BinaryEncoder exte
@Override
public void writeBytes(ByteBuffer bytes) throws IOException {
- int pos = bytes.position();
- int start = bytes.arrayOffset() + pos;
- int len = bytes.limit() - pos;
- writeBytes(bytes.array(), start, len);
+ int len = bytes.limit() - bytes.position();
+ if (0 == len) {
+ writeZero();
+ } else {
+ writeInt(len);
+ writeFixed(bytes);
+ }
}
@Override
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/BufferedBinaryEncoder.java
Thu May 17 19:50:49 2012
@@ -19,6 +19,9 @@ package org.apache.avro.io;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
import org.apache.avro.AvroRuntimeException;
@@ -151,6 +154,15 @@ public class BufferedBinaryEncoder exten
System.arraycopy(bytes, start, buf, pos, len);
pos += len;
}
+
+ @Override
+ public void writeFixed(ByteBuffer bytes) throws IOException {
+ if (!bytes.hasArray() && bytes.remaining() > bulkLimit) {
+ sink.innerWrite(bytes); // bypass the buffer
+ } else {
+ super.writeFixed(bytes);
+ }
+ }
@Override
protected void writeZero() throws IOException {
@@ -182,15 +194,20 @@ public class BufferedBinaryEncoder exten
protected ByteSink() {}
/** Write data from bytes, starting at off, for len bytes **/
protected abstract void innerWrite(byte[] bytes, int off, int len) throws
IOException;
+
+ protected abstract void innerWrite(ByteBuffer buff) throws IOException;
+
/** Flush the underlying output, if supported **/
protected abstract void innerFlush() throws IOException;
}
static class OutputStreamSink extends ByteSink {
private final OutputStream out;
+ private final WritableByteChannel channel;
private OutputStreamSink(OutputStream out) {
super();
this.out = out;
+ channel = Channels.newChannel(out);
}
@Override
protected void innerWrite(byte[] bytes, int off, int len)
@@ -201,5 +218,9 @@ public class BufferedBinaryEncoder exten
protected void innerFlush() throws IOException {
out.flush();
}
+ @Override
+ protected void innerWrite(ByteBuffer buff) throws IOException {
+ channel.write(buff);
+ }
}
}
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java
(original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/io/Encoder.java Thu
May 17 19:50:49 2012
@@ -166,6 +166,19 @@ public abstract class Encoder implements
writeFixed(bytes, 0, bytes.length);
}
+ /** A version of {@link writeFixed} for ByteBuffers. */
+ public void writeFixed(ByteBuffer bytes) throws IOException {
+ int pos = bytes.position();
+ int len = bytes.limit() - pos;
+ if (bytes.hasArray()) {
+ writeFixed(bytes.array(), bytes.arrayOffset() + pos, len);
+ } else {
+ byte[] b = new byte[len];
+ bytes.get(b, 0, len);
+ writeFixed(b, 0, len);
+ }
+ }
+
/**
* Writes an enumeration.
* @param e
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
Thu May 17 19:50:49 2012
@@ -262,8 +262,11 @@ public class ReflectData extends Specifi
return super.createSchema(type, names);
if (c.isArray()) { // array
Class component = c.getComponentType();
- if (component == Byte.TYPE) // byte array
- return Schema.create(Schema.Type.BYTES);
+ if (component == Byte.TYPE) { // byte array
+ Schema result = Schema.create(Schema.Type.BYTES);
+ result.addProp(CLASS_PROP, c.getName());
+ return result;
+ }
Schema result = Schema.createArray(createSchema(component, names));
setElement(result, component);
return result;
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
Thu May 17 19:50:49 2012
@@ -121,11 +121,17 @@ public class ReflectDatumReader<T> exten
protected Object createString(String value) { return value; }
@Override
- protected Object readBytes(Object old, Decoder in) throws IOException {
+ protected Object readBytes(Object old, Schema s, Decoder in)
+ throws IOException {
ByteBuffer bytes = in.readBytes(null);
- byte[] result = new byte[bytes.remaining()];
- bytes.get(result);
- return result;
+ Class c = ReflectData.getClassProp(s, ReflectData.CLASS_PROP);
+ if (c != null && c.isArray()) {
+ byte[] result = new byte[bytes.remaining()];
+ bytes.get(result);
+ return result;
+ } else {
+ return bytes;
+ }
}
@Override
Modified:
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java?rev=1339825&r1=1339824&r2=1339825&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java
(original)
+++ avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestReflect.java
Thu May 17 19:50:49 2012
@@ -101,7 +101,8 @@ public class TestReflect {
}
@Test public void testBytes() {
- check(new byte[0], "\"bytes\"");
+ check(ByteBuffer.allocate(0), "\"bytes\"");
+ check(new byte[0], "{\"type\":\"bytes\",\"java-class\":\"[B\"}");
}
@Test public void testUnionWithCollection() {
Added:
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java?rev=1339825&view=auto
==============================================================================
---
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java
(added)
+++
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java
Thu May 17 19:50:49 2012
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.reflect;
+
+import static org.junit.Assert.*;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.file.SeekableByteArrayInput;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.reflect.ReflectDatumWriter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ByteBufferTest {
+ static class X{
+ String name = "";
+ ByteBuffer content;
+ }
+ File tmpdir;
+ File content;
+
+ @Before public void before() throws IOException{
+ tmpdir = File.createTempFile("avro", "test");
+ tmpdir.delete();
+ tmpdir.mkdirs();
+ content = new File(tmpdir,"test-content");
+ FileOutputStream out = new FileOutputStream(content);
+ for(int i=0;i<100000;i++){
+ out.write("hello world\n".getBytes());
+ }
+ out.close();
+ }
+
+ @Test public void test() throws Exception{
+ Schema schema = ReflectData.get().getSchema(X.class);
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ writeOneXAsAvro(schema, bout);
+ X record = readOneXFromAvro(schema, bout);
+
+ String expected = getmd5(content);
+ String actual = getmd5(record.content);
+ assertEquals("md5 for result differed from input",expected,actual);
+ }
+
+ private X readOneXFromAvro(Schema schema, ByteArrayOutputStream bout)
+ throws IOException {
+ SeekableByteArrayInput input = new
SeekableByteArrayInput(bout.toByteArray());
+ ReflectDatumReader<X> datumReader = new ReflectDatumReader<X>(schema);
+ FileReader<X> reader = DataFileReader.openReader(input, datumReader);
+ Iterator<X> it = reader.iterator();
+ assertTrue("missing first record",it.hasNext());
+ X record = it.next();
+ assertFalse("should be no more records - only wrote one out",it.hasNext());
+ return record;
+ }
+
+ private void writeOneXAsAvro(Schema schema, ByteArrayOutputStream bout)
+ throws IOException, FileNotFoundException {
+ DatumWriter<X> datumWriter = new ReflectDatumWriter<X>(schema);
+ DataFileWriter<X> writer = new DataFileWriter<X>(datumWriter);
+ writer.create(schema, bout);
+ X x = new X();
+ x.name = "xxx";
+ FileInputStream fis = new FileInputStream(content);
+ try{
+ FileChannel channel = fis.getChannel();
+ try{
+ long contentLength = content.length();
+ //set the content to be a file channel.
+ ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0,
contentLength);
+ x.content = buffer;
+ writer.append(x);
+ }finally{
+ channel.close();
+ }
+ }finally{
+ fis.close();
+ }
+ writer.flush();
+ writer.close();
+ }
+
+ private String getmd5(File file) throws Exception{
+ FileInputStream fis = new FileInputStream(content);
+ try{
+ FileChannel channel = fis.getChannel();
+ try{
+ long contentLength = content.length();
+ ByteBuffer buffer = channel.map(FileChannel.MapMode.READ_ONLY, 0,
contentLength);
+ return getmd5(buffer);
+ }finally{
+ channel.close();
+ }
+ }finally{
+ fis.close();
+ }
+ }
+
+ String getmd5(ByteBuffer buffer) throws NoSuchAlgorithmException{
+ MessageDigest mdEnc = MessageDigest.getInstance("MD5");
+ mdEnc.reset();
+ mdEnc.update(buffer);
+ return new BigInteger(1, mdEnc.digest()).toString(16);
+ }
+}
Propchange:
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/ByteBufferTest.java
------------------------------------------------------------------------------
svn:eol-style = native