Updated Branches: refs/heads/trunk f226cd1b0 -> 24f3eaf20
minor enhancements to Avro codec Project: http://git-wip-us.apache.org/repos/asf/mina/repo Commit: http://git-wip-us.apache.org/repos/asf/mina/commit/24f3eaf2 Tree: http://git-wip-us.apache.org/repos/asf/mina/tree/24f3eaf2 Diff: http://git-wip-us.apache.org/repos/asf/mina/diff/24f3eaf2 Branch: refs/heads/trunk Commit: 24f3eaf20d3c63e7d6095b57fcc175a8a47161bb Parents: f226cd1 Author: paliwalashish <[email protected]> Authored: Sun Jul 28 03:02:41 2013 +0530 Committer: paliwalashish <[email protected]> Committed: Sun Jul 28 03:02:41 2013 +0530 ---------------------------------------------------------------------- .../org/apache/mina/avro/codec/AvroDecoder.java | 7 ++- .../org/apache/mina/avro/codec/AvroEncoder.java | 7 ++- .../codec/serialization/AvroMessageDecoder.java | 27 +++++++-- .../codec/serialization/AvroMessageEncoder.java | 59 +++++++++++++++----- .../mina/avro/codec/serialization/AvroTest.java | 43 +++++++++----- 5 files changed, 105 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/mina/blob/24f3eaf2/avro/src/main/java/org/apache/mina/avro/codec/AvroDecoder.java ---------------------------------------------------------------------- diff --git a/avro/src/main/java/org/apache/mina/avro/codec/AvroDecoder.java b/avro/src/main/java/org/apache/mina/avro/codec/AvroDecoder.java index 997d73a..25ce582 100644 --- a/avro/src/main/java/org/apache/mina/avro/codec/AvroDecoder.java +++ b/avro/src/main/java/org/apache/mina/avro/codec/AvroDecoder.java @@ -21,6 +21,7 @@ package org.apache.mina.avro.codec; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericContainer; import org.apache.mina.avro.codec.serialization.AvroMessageDecoder; import org.apache.mina.codec.delimited.IoBufferDecoder; import org.apache.mina.codec.delimited.SizePrefixedDecoder; @@ -29,12 +30,12 @@ import org.apache.mina.codec.delimited.ints.VarInt; /** * */ -public class AvroDecoder<GenericRecord> extends SizePrefixedDecoder<GenericRecord> { +public class AvroDecoder<T extends GenericContainer> extends SizePrefixedDecoder<T> { private Schema schema; - public AvroDecoder(IoBufferDecoder<Integer> sizeDecoder, IoBufferDecoder<GenericRecord> payloadDecoder, Schema schema) { - super(new VarInt().getDecoder(), new AvroMessageDecoder<GenericRecord>(schema)); + public AvroDecoder(IoBufferDecoder<Integer> sizeDecoder, IoBufferDecoder<T> payloadDecoder, Schema schema) { + super(new VarInt().getDecoder(), new AvroMessageDecoder<T>(schema)); this.schema = schema; } } http://git-wip-us.apache.org/repos/asf/mina/blob/24f3eaf2/avro/src/main/java/org/apache/mina/avro/codec/AvroEncoder.java ---------------------------------------------------------------------- diff --git a/avro/src/main/java/org/apache/mina/avro/codec/AvroEncoder.java b/avro/src/main/java/org/apache/mina/avro/codec/AvroEncoder.java index 51d3b75..22c9467 100644 --- a/avro/src/main/java/org/apache/mina/avro/codec/AvroEncoder.java +++ b/avro/src/main/java/org/apache/mina/avro/codec/AvroEncoder.java @@ -19,6 +19,7 @@ */ package org.apache.mina.avro.codec; +import org.apache.avro.generic.GenericContainer; import org.apache.avro.generic.GenericRecord; import org.apache.mina.avro.codec.serialization.AvroMessageEncoder; import org.apache.mina.codec.delimited.ByteBufferEncoder; @@ -28,8 +29,8 @@ import org.apache.mina.codec.delimited.ints.VarInt; /** * */ -public class AvroEncoder<IN extends GenericRecord> extends SizePrefixedEncoder<GenericRecord> { - public AvroEncoder(ByteBufferEncoder<Integer> sizeEncoder, ByteBufferEncoder<GenericRecord> payloadEncoder) { - super(new VarInt().getEncoder(), new AvroMessageEncoder<GenericRecord>()); +public class AvroEncoder<T extends GenericContainer> extends SizePrefixedEncoder<T> { + public AvroEncoder(ByteBufferEncoder<Integer> sizeEncoder, ByteBufferEncoder<T> payloadEncoder) { + super(new VarInt().getEncoder(), new AvroMessageEncoder<T>()); } } http://git-wip-us.apache.org/repos/asf/mina/blob/24f3eaf2/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageDecoder.java ---------------------------------------------------------------------- diff --git a/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageDecoder.java b/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageDecoder.java index 2df618e..013fea1 100644 --- a/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageDecoder.java +++ b/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageDecoder.java @@ -21,19 +21,29 @@ package org.apache.mina.avro.codec.serialization; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericContainer; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; +import org.apache.avro.reflect.ReflectDatumReader; import org.apache.mina.codec.IoBuffer; +import org.apache.mina.codec.ProtocolDecoderException; import org.apache.mina.codec.delimited.IoBufferDecoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; /** + * Avro Message Decoder * + * Uses ReflectDatumReader to read the data from the stream */ -public class AvroMessageDecoder<GenericRecord> extends IoBufferDecoder<GenericRecord> { +public class AvroMessageDecoder<T extends GenericContainer> extends IoBufferDecoder<T> { + + // Logger + public static final Logger LOG = LoggerFactory.getLogger(AvroMessageDecoder.class); private Schema schema; @@ -42,18 +52,23 @@ public class AvroMessageDecoder<GenericRecord> extends IoBufferDecoder<GenericRe * @param schema */ public AvroMessageDecoder(Schema schema) { + if(schema == null) { + LOG.error("Avro Schema passed cannot be null"); + throw new IllegalArgumentException("Avro Schema cannot be null"); + } this.schema = schema; } @Override - public GenericRecord decode(IoBuffer input) { + public T decode(IoBuffer input) { BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(input.array(), null); - GenericDatumReader<GenericRecord> recordGenericDatumReader = new GenericDatumReader<GenericRecord>(schema); - GenericRecord result = null; + ReflectDatumReader<T> reader = new ReflectDatumReader<T>(schema); + T result = null; try { - result = recordGenericDatumReader.read(null, binaryDecoder); + result = reader.read(null, binaryDecoder); }catch (IOException ioEx) { - ioEx.printStackTrace(); + LOG.error("Error while decoding", ioEx); + throw new ProtocolDecoderException(ioEx.getMessage()); } return result; } http://git-wip-us.apache.org/repos/asf/mina/blob/24f3eaf2/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageEncoder.java ---------------------------------------------------------------------- diff --git a/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageEncoder.java b/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageEncoder.java index ecddad2..3ae9cc8 100644 --- a/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageEncoder.java +++ b/avro/src/main/java/org/apache/mina/avro/codec/serialization/AvroMessageEncoder.java @@ -20,43 +20,76 @@ package org.apache.mina.avro.codec.serialization; +import org.apache.avro.generic.GenericContainer; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; +import org.apache.avro.specific.SpecificDatumWriter; +import org.apache.avro.specific.SpecificRecord; +import org.apache.mina.codec.ProtocolEncoderException; import org.apache.mina.codec.delimited.ByteBufferEncoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; /** + * Avro Message encoder * + * It can be used to handle both a Generic Record as well as Specific Record */ -public class AvroMessageEncoder<OUT extends GenericRecord> extends ByteBufferEncoder<GenericRecord> { +public class AvroMessageEncoder<T extends GenericContainer> extends ByteBufferEncoder<T> { + + // Logger + public static final Logger LOG = LoggerFactory.getLogger(AvroMessageEncoder.class); private ByteBuffer encodedMessage; @Override - public int getEncodedSize(GenericRecord message) { + public int getEncodedSize(T message) { ByteArrayOutputStream out = new ByteArrayOutputStream(); - DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(message.getSchema()); - Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); - try { - writer.write(message, encoder); - encoder.flush(); - byte[] encoded = out.toByteArray(); - encodedMessage = ByteBuffer.wrap(encoded); - out.close(); - } catch (IOException ioEx) { - // :( + + // Need to check for writer + if(message instanceof GenericRecord) { + DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(message.getSchema()); + Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); + try { + writer.write((GenericRecord)message, encoder); + encoder.flush(); + byte[] encoded = out.toByteArray(); + encodedMessage = ByteBuffer.wrap(encoded); + out.close(); + } catch (IOException ioEx) { + LOG.error("error while marshalling", ioEx); + throw new ProtocolEncoderException(ioEx.getMessage()); + } + } else if (message instanceof SpecificRecord) { + DatumWriter<T> writer = new SpecificDatumWriter<T>(message.getSchema()); + Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); + try { + writer.write(message, encoder); + encoder.flush(); + byte[] encoded = out.toByteArray(); + encodedMessage = ByteBuffer.wrap(encoded); + out.close(); + } catch (IOException ioEx) { + LOG.error("error while marshalling", ioEx); + throw new ProtocolEncoderException(ioEx.getMessage()); + } + } else { + LOG.warn("Unknown object type, serialization method not known for {}", message.getClass()); + throw new ProtocolEncoderException(message.getClass() + " cannot be Serialized"); } + return encodedMessage != null ? encodedMessage.capacity() : -1; } @Override - public void writeTo(GenericRecord message, ByteBuffer buffer) { + public void writeTo(T message, ByteBuffer buffer) { buffer.put(encodedMessage); } } http://git-wip-us.apache.org/repos/asf/mina/blob/24f3eaf2/avro/src/test/java/org/apache/mina/avro/codec/serialization/AvroTest.java ---------------------------------------------------------------------- diff --git a/avro/src/test/java/org/apache/mina/avro/codec/serialization/AvroTest.java b/avro/src/test/java/org/apache/mina/avro/codec/serialization/AvroTest.java index 5a754fc..8d2c754 100644 --- a/avro/src/test/java/org/apache/mina/avro/codec/serialization/AvroTest.java +++ b/avro/src/test/java/org/apache/mina/avro/codec/serialization/AvroTest.java @@ -23,6 +23,7 @@ package org.apache.mina.avro.codec.serialization; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; +import org.apache.mina.avro.generated.User; import org.apache.mina.codec.IoBuffer; import org.apache.mina.codec.delimited.ByteBufferEncoder; import org.apache.mina.codec.delimited.IoBufferDecoder; @@ -33,6 +34,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import static org.junit.Assert.assertEquals; + /** * */ @@ -59,26 +62,40 @@ public class AvroTest extends GenericSerializerTest { } @Override - public List<GenericRecord> getObjects() { - List<GenericRecord> genericRecordList = new ArrayList<GenericRecord>(1); - GenericRecord record1 = new GenericData.Record(SCHEMA); - record1.put("name", "Black Jack"); - record1.put("favorite_number", 11); - record1.put("favorite_color", "Black"); - genericRecordList.add(record1); + public List<User> getObjects() { + List<User> genericRecordList = new ArrayList<User>(1); + User user1 = new User("Red User", 11, "Red"); + genericRecordList.add(user1); return genericRecordList; } @Test - public void testMessage() throws Exception { - ByteBufferEncoder<GenericRecord> encoder = getEncoder(); - AvroMessageDecoder<GenericRecord> decoder = new AvroMessageDecoder<GenericRecord>(SCHEMA); + public void testUser() throws Exception { + ByteBufferEncoder<User> encoder = new AvroMessageEncoder<User>(); + AvroMessageDecoder<User> decoder = new AvroMessageDecoder<User>(SCHEMA); - for (GenericRecord object : getObjects()) { - GenericRecord message = decoder.decode(IoBuffer + for (User object : getObjects()) { + User message = decoder.decode(IoBuffer .wrap(encoder.encode(object))); - System.out.println(message); + assertEquals(getObjects().get(0), message); } } + + @Test + public void testGenericMessage() throws Exception { + GenericRecord record1 = new GenericData.Record(SCHEMA); + record1.put("name", "Black Jack"); + record1.put("favorite_number", 11); + record1.put("favorite_color", "Black"); + + ByteBufferEncoder<GenericRecord> encoder = new AvroMessageEncoder<GenericRecord>(); + AvroMessageDecoder<User> decoder = new AvroMessageDecoder<User>(SCHEMA); + + User message = decoder.decode(IoBuffer.wrap(encoder.encode(record1))); + assertEquals(record1.get("name"), message.getName()); + assertEquals(record1.get("favorite_number"), message.getFavoriteNumber()); + assertEquals(record1.get("favorite_color"), message.getFavoriteColor()); + } + }
