Fixing avro deserialization

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/a901018d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/a901018d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/a901018d

Branch: refs/heads/master
Commit: a901018d2bafb8e496c21de2da830c6814203f92
Parents: 68c341b
Author: Jakub Jankowski <[email protected]>
Authored: Wed May 10 14:37:25 2017 +0200
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../samoa/streams/kafka/KafkaAvroMapper.java    | 110 ++++++++++---------
 .../kafka/KafkaEntranceProcessorTest.java       |   5 +-
 2 files changed, 61 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a901018d/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java
index 91902d0..42d11bc 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaAvroMapper.java
@@ -21,14 +21,17 @@ import java.io.File;
 import java.io.IOException;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.avro.reflect.ReflectDatumWriter;
 import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
@@ -71,65 +74,22 @@ public class KafkaAvroMapper implements 
KafkaDeserializer<InstanceContentEvent>,
 
        @Override
        public byte[] serialize(InstanceContentEvent message) {
-               return toBytesGeneric(InstanceContentEvent.class, message);
+               return avroSerialize(InstanceContentEvent.class, message);
        }
 
        @Override
        public InstanceContentEvent deserialize(byte[] message) {
-               return avroDeserialize(message, InstanceContentEvent.class, 
null);
+               return avroDeserialize(message, InstanceContentEvent.class);
        }
 
-       public static <T> byte[] avroSerialize(Class<T> clazz, Object object) {
-               byte[] ret = null;
-               try {
-                       if (object == null || !(object instanceof 
SpecificRecord)) {
-                               return null;
-                       }
-
-                       T record = (T) object;
-                       ByteArrayOutputStream out = new ByteArrayOutputStream();
-                       Encoder e = 
EncoderFactory.get().directBinaryEncoder(out, null);
-                       SpecificDatumWriter<T> w = new 
SpecificDatumWriter<T>(clazz);
-                       w.write(record, e);
-                       e.flush();
-                       ret = out.toByteArray();
-               } catch (IOException e) {
-
-               }
-
-               return ret;
-       }
-
-       public static <T> T avroDeserialize(byte[] avroBytes, Class<T> clazz, 
Schema schema) {
-               T ret = null;
-               try {
-                       ByteArrayInputStream in = new 
ByteArrayInputStream(avroBytes);
-                       Decoder d = 
DecoderFactory.get().directBinaryDecoder(in, null);
-                       SpecificDatumReader<T> reader = new 
SpecificDatumReader<T>(clazz);
-                       ret = reader.read(null, d);
-               } catch (IOException e) {
-
-               }
-
-               return ret;
-       }
-
-       public static <V> byte[] toBytesGeneric(final Class<V> cls, final V v) {
-               final ByteArrayOutputStream bout = new ByteArrayOutputStream();
-               final Schema schema = 
ReflectData.AllowNull.get().getSchema(cls);
-               final DatumWriter<V> writer = new ReflectDatumWriter<V>(schema);
-               final BinaryEncoder binEncoder = 
EncoderFactory.get().binaryEncoder(bout, null);
-               try {
-                       writer.write(v, binEncoder);
-                       binEncoder.flush();
-               } catch (final Exception e) {
-                       throw new RuntimeException(e);
-               }
 
-               return bout.toByteArray();
-       }
-
-       public static <V> byte[] avroBurrSerialize(final Class<V> cls, final V 
v) {
+       /** 
+        * Avro serialization based on specified schema
+        * @param cls
+        * @param v
+        * @return
+        */
+       public static <V> byte[] avroSerialize(final Class<V> cls, final V v) {
                ByteArrayOutputStream bout = new ByteArrayOutputStream();
                try {
                        Schema schema = new Schema.Parser().parse(new 
File("C:/java/avro/kafka.avsc"));
@@ -154,5 +114,51 @@ public class KafkaAvroMapper implements 
KafkaDeserializer<InstanceContentEvent>,
                return bout.toByteArray();
                
        }
+       
+       /** 
+        * Avro deserialization based on specified schema
+        * @param cls
+        * @param v
+        * @return
+        */
+       public static <V> V avroDeserialize(byte[] avroBytes, Class<V> clazz) {
+               V ret = null;
+               try {
+                       Schema schema = new Schema.Parser().parse(new 
File("C:/java/avro/kafka.avsc"));
+                       ByteArrayInputStream in = new 
ByteArrayInputStream(avroBytes);
+                       DatumReader<V> reader = new 
GenericDatumReader<>(schema);
+                       
+                       Decoder decoder = 
DecoderFactory.get().directBinaryDecoder(in, null);
+                       
+                       ret = reader.read(null, decoder);
+               } catch (IOException e) {
+                       e.printStackTrace();
+               } catch (final Exception e) {
+                       throw new RuntimeException(e);
+               }
+               
+               return ret;
+       }
+       
+       /** 
+        * Avro serialization using reflection
+        * @param cls
+        * @param v
+        * @return
+        */
+       public static <V> byte[] toBytesGeneric(final Class<V> cls, final V v) {
+               final ByteArrayOutputStream bout = new ByteArrayOutputStream();
+               final Schema schema = 
ReflectData.AllowNull.get().getSchema(cls);
+               final DatumWriter<V> writer = new ReflectDatumWriter<V>(schema);
+               final BinaryEncoder binEncoder = 
EncoderFactory.get().binaryEncoder(bout, null);
+               try {
+                       writer.write(v, binEncoder);
+                       binEncoder.flush();
+               } catch (final Exception e) {
+                       throw new RuntimeException(e);
+               }
+
+               return bout.toByteArray();
+       }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/a901018d/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
index 3da9d6f..bc2a11e 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
@@ -214,7 +214,7 @@ public class KafkaEntranceProcessorTest {
                 for (i = 0; i < NUM_INSTANCES; i++) {
                     try {
                        //byte[] data = 
avroMapper.serialize(TestUtilsForKafka.getData(r, 10, header));
-                       byte[] data = 
KafkaAvroMapper.avroBurrSerialize(InstanceContentEvent.class, 
TestUtilsForKafka.getData(r, 10, header));
+                       byte[] data = 
KafkaAvroMapper.avroSerialize(InstanceContentEvent.class, 
TestUtilsForKafka.getData(r, 10, header));
                        if(data == null)
                                
Logger.getLogger(KafkaEntranceProcessorTest.class.getName()).log(Level.INFO, 
"Serialize result: null ("+i+")");
                         ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC, data);
@@ -233,7 +233,8 @@ public class KafkaEntranceProcessorTest {
 
         int z = 0;
         while (kep.hasNext() && z < NUM_INSTANCES) {
-            logger.log(Level.INFO, "{0} {1}", new Object[]{z++, 
kep.nextEvent().toString()});
+               InstanceContentEvent event = 
(InstanceContentEvent)kep.nextEvent();
+            logger.log(Level.INFO, "{0} {1}", new Object[]{z++, 
event.getInstance().toString()});
         }       
 
         assertEquals("Number of sent and received instances", NUM_INSTANCES, 
z);        

Reply via email to