Changes in JSON mapper Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/fb17e7f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/fb17e7f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/fb17e7f8
Branch: refs/heads/master Commit: fb17e7f8304f08b1fc20678ee5c215b4fd8b3d9c Parents: 3fbfc07 Author: pwawrzyniak <[email protected]> Authored: Tue May 16 15:00:33 2017 +0200 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .../streams/kafka/KafkaConsumerThread.java | 23 ++++++++- .../samoa/streams/kafka/KafkaJsonMapper.java | 49 +++++++++++++------- .../kafka/KafkaEntranceProcessorTest.java | 13 ++++-- 3 files changed, 63 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/fb17e7f8/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java index 6522f67..a93986e 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaConsumerThread.java @@ -15,6 +15,27 @@ */ package org.apache.samoa.streams.kafka; +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2017 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + + import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -28,7 +49,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; /** * - * @author pwawrzyniak <your.name at your.org> + * @author pwawrzyniak */ class KafkaConsumerThread extends Thread { http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/fb17e7f8/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java index 1996b40..2ac3e04 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java @@ -34,39 +34,43 @@ package org.apache.samoa.streams.kafka; * limitations under the License. * #L% */ - - import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.InstanceCreator; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; import java.lang.reflect.Type; import java.nio.charset.Charset; -import java.util.logging.Level; -import java.util.logging.Logger; +import org.apache.samoa.instances.DenseInstanceData; import org.apache.samoa.instances.InstanceData; -import org.apache.samoa.instances.SingleClassInstanceData; import org.apache.samoa.learners.InstanceContentEvent; /** - * Sample class for serializing and deserializing {@link InstanceContentEvent} from/to JSON format + * Sample class for serializing and deserializing {@link InstanceContentEvent} + * from/to JSON format + * * @author pwawrzyniak * @version 0.5.0-incubating-SNAPSHOT * @since 0.5.0-incubating */ -public class KafkaJsonMapper implements KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent>{ +public class KafkaJsonMapper implements KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent> { private final transient Gson gson; private final Charset charset; /** * Class constructor + * * @param charset Charset to be used for bytes parsing */ - public KafkaJsonMapper(Charset charset){ - this.gson = new GsonBuilder().registerTypeAdapter(InstanceData.class, new InstanceDataCreator()).create(); + public KafkaJsonMapper(Charset charset) { + this.gson = new GsonBuilder().registerTypeAdapter(InstanceData.class, new InstanceDataCustomDeserializer()).create(); this.charset = charset; } - + @Override public InstanceContentEvent deserialize(byte[] message) { return gson.fromJson(new String(message, this.charset), InstanceContentEvent.class); @@ -76,14 +80,27 @@ public class KafkaJsonMapper implements KafkaDeserializer<InstanceContentEvent>, public byte[] serialize(InstanceContentEvent message) { return gson.toJson(message).getBytes(this.charset); } - - public class InstanceDataCreator implements InstanceCreator<InstanceData>{ + + //Unused + public class InstanceDataCreator implements InstanceCreator<InstanceData> { + + @Override + public InstanceData createInstance(Type type) { + return new DenseInstanceData(); + } + } + + public class InstanceDataCustomDeserializer implements JsonDeserializer<InstanceData> { @Override - public InstanceData createInstance(Type type) { - return new SingleClassInstanceData(); + public DenseInstanceData deserialize(JsonElement je, Type type, JsonDeserializationContext jdc) throws JsonParseException { + double[] attributeValues = null; + JsonObject obj = (JsonObject) je; + attributeValues = jdc.deserialize(obj.get("attributeValues"), double[].class); + DenseInstanceData did = new DenseInstanceData(attributeValues); + return did; } - + } - + } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/fb17e7f8/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java index 009a6a7..933ba2a 100644 --- a/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java +++ b/samoa-api/src/test/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessorTest.java @@ -38,6 +38,8 @@ import com.google.gson.Gson; import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; import java.util.Random; import java.util.concurrent.ExecutionException; @@ -70,6 +72,7 @@ import org.apache.samoa.instances.InstancesHeader; /** * * @author pwawrzyniak + * @author Jakub Jankowski */ public class KafkaEntranceProcessorTest { @@ -137,7 +140,7 @@ public class KafkaEntranceProcessorTest { @Test public void testFetchingNewDataWithJson() throws InterruptedException, ExecutionException, TimeoutException { - Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName()); + final Logger logger = Logger.getLogger(KafkaEntranceProcessorTest.class.getName()); logger.log(Level.INFO, "JSON"); logger.log(Level.INFO, "testFetchingNewDataWithJson"); Properties props = TestUtilsForKafka.getConsumerProperties(BROKERHOST, BROKERPORT); @@ -145,7 +148,7 @@ public class KafkaEntranceProcessorTest { KafkaEntranceProcessor kep = new KafkaEntranceProcessor(props, TOPIC_JSON, TIMEOUT, new KafkaJsonMapper(Charset.defaultCharset())); kep.onCreate(1); - + // prepare new thread for data producing Thread th = new Thread(new Runnable() { @Override @@ -159,6 +162,7 @@ public class KafkaEntranceProcessorTest { for (i = 0; i < NUM_INSTANCES; i++) { try { InstanceContentEvent event = TestUtilsForKafka.getData(r, 10, header); + ProducerRecord<String, byte[]> record = new ProducerRecord(TOPIC_JSON, gson.toJson(event).getBytes()); long stat = producer.send(record).get(10, TimeUnit.SECONDS).offset(); } catch (InterruptedException | ExecutionException | TimeoutException ex) { @@ -173,11 +177,10 @@ public class KafkaEntranceProcessorTest { int z = 0; while (z < NUM_INSTANCES && kep.hasNext()) { - InstanceContentEvent event = (InstanceContentEvent) kep.nextEvent(); + InstanceContentEvent event = (InstanceContentEvent) kep.nextEvent(); z++; -// logger.log(Level.INFO, "{0} {1}", new Object[]{z, event.getInstance().toString()}); } - + assertEquals("Number of sent and received instances", NUM_INSTANCES, z); }
