Fixing avro deserialization
Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/a901018d Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/a901018d Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/a901018d Branch: refs/heads/master Commit: a901018d2bafb8e496c21de2da830c6814203f92 Parents: 68c341b Author: Jakub Jankowski <[email protected]> Authored: Wed May 10 14:37:25 2017 +0200 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .../samoa/streams/kafka/KafkaAvroMapper.java | 110 ++++++++++--------- .../kafka/KafkaEntranceProcessorTest.java | 5 +- 2 files changed, 61 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a901018d/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java index 91902d0..42d11bc 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java @@ -21,14 +21,17 @@ import java.io.File; import java.io.IOException; import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Decoder; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.avro.specific.SpecificDatumReader; import org.apache.avro.specific.SpecificDatumWriter; @@ -71,65 +74,22 @@ public class KafkaAvroMapper implements KafkaDeserializer<InstanceContentEvent>, @Override public byte[] serialize(InstanceContentEvent message) { - return toBytesGeneric(InstanceContentEvent.class, message); + return avroSerialize(InstanceContentEvent.class, message); } @Override public InstanceContentEvent deserialize(byte[] message) { - return avroDeserialize(message, InstanceContentEvent.class, null); + return avroDeserialize(message, InstanceContentEvent.class); } - public static <T> byte[] avroSerialize(Class<T> clazz, Object object) { - byte[] ret = null; - try { - if (object == null || !(object instanceof SpecificRecord)) { - return null; - } - - T record = (T) object; - ByteArrayOutputStream out = new ByteArrayOutputStream(); - Encoder e = EncoderFactory.get().directBinaryEncoder(out, null); - SpecificDatumWriter<T> w = new SpecificDatumWriter<T>(clazz); - w.write(record, e); - e.flush(); - ret = out.toByteArray(); - } catch (IOException e) { - - } - - return ret; - } - - public static <T> T avroDeserialize(byte[] avroBytes, Class<T> clazz, Schema schema) { - T ret = null; - try { - ByteArrayInputStream in = new ByteArrayInputStream(avroBytes); - Decoder d = DecoderFactory.get().directBinaryDecoder(in, null); - SpecificDatumReader<T> reader = new SpecificDatumReader<T>(clazz); - ret = reader.read(null, d); - } catch (IOException e) { - - } - - return ret; - } - - public static <V> byte[] toBytesGeneric(final Class<V> cls, final V v) { - final ByteArrayOutputStream bout = new ByteArrayOutputStream(); - final Schema schema = ReflectData.AllowNull.get().getSchema(cls); - final DatumWriter<V> writer = new ReflectDatumWriter<V>(schema); - final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(bout, null); - try { - writer.write(v, binEncoder); - binEncoder.flush(); - } catch (final Exception e) { - throw new RuntimeException(e); - } - return bout.toByteArray(); - } - - public static <V> byte[] avroBurrSerialize(final Class<V> cls, final V v) { + /** + * Avro serialization based on specified schema + * @param cls + * @param v + * @return + */ + public static <V> byte[] avroSerialize(final Class<V> cls, final V v) { ByteArrayOutputStream bout = new ByteArrayOutputStream(); try { Schema schema = new Schema.Parser().parse(new File("C:/java/avro/kafka.avsc")); @@ -154,5 +114,51 @@ public class KafkaAvroMapper implements KafkaDeserializer<InstanceContentEvent>, return bout.toByteArray(); } + + /** + * Avro deserialization based on specified schema + * @param cls + * @param v + * @return + */ + public static <V> V avroDeserialize(byte[] avroBytes, Class<V> clazz) { + V ret = null; + try { + Schema schema = new Schema.Parser().parse(new File("C:/java/avro/kafka.avsc")); + ByteArrayInputStream in = new ByteArrayInputStream(avroBytes); + DatumReader<V> reader = new GenericDatumReader<>(schema); + + Decoder decoder = DecoderFactory.get().directBinaryDecoder(in, null); + + ret = reader.read(null, decoder); + } catch (IOException e) { + e.printStackTrace(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + + return ret; + } + + /** + * Avro serialization using reflection + * @param cls + * @param v + * @return + */ + public static <V> byte[] toBytesGeneric(final Class<V> cls, final V v) { + final ByteArrayOutputStream bout = new ByteArrayOutputStream(); + final Schema schema = ReflectData.AllowNull.get().getSchema(cls); + final DatumWriter<V> writer = new ReflectDatumWriter<V>(schema); + final BinaryEncoder binEncoder = EncoderFactory.get().binaryEncoder(bout, null); + try { + writer.write(v, binEncoder); + binEncoder.flush(); + } catch (final Exception e) { + throw new RuntimeException(e); + } + + return bout.toByteArray(); + } } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a901018d/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java index 3da9d6f..bc2a11e 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java @@ -214,7 +214,7 @@ public class KafkaEntranceProcessorTest { for (i = 0; i < NUM_INSTANCES; i++) { try { //byte[] data = avroMapper.serialize(TestUtilsForKafka.getData(r, 10, header)); - byte[] data = KafkaAvroMapper.avroBurrSerialize(InstanceContentEvent.class, TestUtilsForKafka.getData(r, 10, header)); + byte[] data = KafkaAvroMapper.avroSerialize(InstanceContentEvent.class, TestUtilsForKafka.getData(r, 10, header)); if(data == null) Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, "Serialize result: null ("+i+")"); ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC, data); @@ -233,7 +233,8 @@ public class KafkaEntranceProcessorTest { int z = 0; while (kep.hasNext() && z < NUM_INSTANCES) { - logger.log(Level.INFO, "{0} {1}", new Object[]{z++, kep.nextEvent().toString()}); + InstanceContentEvent event = (InstanceContentEvent)kep.nextEvent(); + logger.log(Level.INFO, "{0} {1}", new Object[]{z++, event.getInstance().toString()}); } assertEquals("Number of sent and received instances", NUM_INSTANCES, z);
