Changes in JSON mapper

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/fb17e7f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/fb17e7f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/fb17e7f8

Branch: refs/heads/master
Commit: fb17e7f8304f08b1fc20678ee5c215b4fd8b3d9c
Parents: 3fbfc07
Author: pwawrzyniak <[email protected]>
Authored: Tue May 16 15:00:33 2017 +0200
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../streams/kafka/KafkaConsumerThread.java      | 23 ++++++++-
 .../samoa/streams/kafka/KafkaJsonMapper.java    | 49 +++++++++++++-------
 .../kafka/KafkaEntranceProcessorTest.java       | 13 ++++--
 3 files changed, 63 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/fb17e7f8/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
index 6522f67..a93986e 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java
@@ -15,6 +15,27 @@
  */
 package org.apache.samoa.streams.kafka;
 
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2017 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -28,7 +49,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 
 /**
  *
- * @author pwawrzyniak <your.name at your.org>
+ * @author pwawrzyniak
  */
 class KafkaConsumerThread extends Thread {
 

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/fb17e7f8/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
index 1996b40..2ac3e04 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
@@ -34,39 +34,43 @@ package org.apache.samoa.streams.kafka;
  * limitations under the License.
  * #L%
  */
-
-
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.InstanceCreator;
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
 import java.lang.reflect.Type;
 import java.nio.charset.Charset;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import org.apache.samoa.instances.DenseInstanceData;
 import org.apache.samoa.instances.InstanceData;
-import org.apache.samoa.instances.SingleClassInstanceData;
 import org.apache.samoa.learners.InstanceContentEvent;
 
 /**
- * Sample class for serializing and deserializing {@link InstanceContentEvent} 
from/to JSON format
+ * Sample class for serializing and deserializing {@link InstanceContentEvent}
+ * from/to JSON format
+ *
  * @author pwawrzyniak
  * @version 0.5.0-incubating-SNAPSHOT
  * @since 0.5.0-incubating
  */
-public class KafkaJsonMapper implements 
KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent>{
+public class KafkaJsonMapper implements 
KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> {
 
     private final transient Gson gson;
     private final Charset charset;
 
     /**
      * Class constructor
+     *
      * @param charset Charset to be used for bytes parsing
      */
-    public KafkaJsonMapper(Charset charset){
-        this.gson = new GsonBuilder().registerTypeAdapter(InstanceData.class, 
new InstanceDataCreator()).create();        
+    public KafkaJsonMapper(Charset charset) {
+        this.gson = new GsonBuilder().registerTypeAdapter(InstanceData.class, 
new InstanceDataCustomDeserializer()).create();
         this.charset = charset;
     }
-    
+
     @Override
     public InstanceContentEvent deserialize(byte[] message) {
         return gson.fromJson(new String(message, this.charset), 
InstanceContentEvent.class);
@@ -76,14 +80,27 @@ public class KafkaJsonMapper implements 
KafkaDeserializer<InstanceContentEvent>,
     public byte[] serialize(InstanceContentEvent message) {
         return gson.toJson(message).getBytes(this.charset);
     }
-    
-    public class InstanceDataCreator implements InstanceCreator<InstanceData>{
+
+    //Unused
+    public class InstanceDataCreator implements InstanceCreator<InstanceData> {
+
+        @Override
+        public InstanceData createInstance(Type type) {
+            return new DenseInstanceData();
+        }
+    }
+
+    public class InstanceDataCustomDeserializer implements 
JsonDeserializer<InstanceData> {
 
         @Override
-        public InstanceData createInstance(Type type) {            
-            return new SingleClassInstanceData();
+        public DenseInstanceData deserialize(JsonElement je, Type type, 
JsonDeserializationContext jdc) throws JsonParseException {
+            double[] attributeValues = null;
+            JsonObject obj = (JsonObject) je;
+            attributeValues = jdc.deserialize(obj.get("attributeValues"), 
double[].class);
+            DenseInstanceData did = new DenseInstanceData(attributeValues);
+            return did;
         }
-        
+
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/fb17e7f8/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
index 009a6a7..933ba2a 100644
--- 
a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
+++ 
b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java
@@ -38,6 +38,8 @@ import com.google.gson.Gson;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
@@ -70,6 +72,7 @@ import org.apache.samoa.instances.InstancesHeader;
 /**
  *
  * @author pwawrzyniak
+ * @author Jakub Jankowski
  */
 public class KafkaEntranceProcessorTest {
 
@@ -137,7 +140,7 @@ public class KafkaEntranceProcessorTest {
     @Test
     public void testFetchingNewDataWithJson() throws InterruptedException, 
ExecutionException, TimeoutException {
 
-        Logger logger = 
Logger.getLogger(KafkaEntranceProcessorTest.class.getName());
+        final Logger logger = 
Logger.getLogger(KafkaEntranceProcessorTest.class.getName());
         logger.log(Level.INFO, "JSON");
         logger.log(Level.INFO, "testFetchingNewDataWithJson");
         Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, 
BROKERPORT);
@@ -145,7 +148,7 @@ public class KafkaEntranceProcessorTest {
         KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, 
TOPIC_JSON, TIMEOUT, new KafkaJsonMapper(Charset.defaultCharset()));
 
         kep.onCreate(1);
-
+       
         // prepare new thread for data producing
         Thread th = new Thread(new Runnable() {
             @Override
@@ -159,6 +162,7 @@ public class KafkaEntranceProcessorTest {
                 for (i = 0; i < NUM_INSTANCES; i++) {
                     try {
                         InstanceContentEvent event = 
TestUtilsForKafka.getData(r, 10, header);
+                                             
                         ProducerRecord<String, byte[]> record = new 
ProducerRecord(TOPIC_JSON, gson.toJson(event).getBytes());
                         long stat = producer.send(record).get(10, 
TimeUnit.SECONDS).offset();
                     } catch (InterruptedException | ExecutionException | 
TimeoutException ex) {
@@ -173,11 +177,10 @@ public class KafkaEntranceProcessorTest {
 
         int z = 0;
         while (z < NUM_INSTANCES && kep.hasNext()) {
-            InstanceContentEvent event = (InstanceContentEvent) 
kep.nextEvent();
+            InstanceContentEvent event = (InstanceContentEvent) 
kep.nextEvent();            
             z++;
-//            logger.log(Level.INFO, "{0} {1}", new Object[]{z, 
event.getInstance().toString()});
         }
-
+        
         assertEquals("Number of sent and received instances", NUM_INSTANCES, 
z);
 
     }

Reply via email to