Sample serializer/deserializer for JSON and InstanceContentEvent
Updates in comments

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/d32cea11
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/d32cea11
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/d32cea11

Branch: refs/heads/master
Commit: d32cea1156507695fdc12c9f706ad332fc2b4788
Parents: c2a589d
Author: pwawrzyniak <[email protected]>
Authored: Fri Mar 17 12:09:52 2017 +0100
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../samoa/streams/kafka/KafkaDeserializer.java  | 64 ++++++++++---------
 .../streams/kafka/KafkaEntranceProcessor.java   |  1 -
 .../samoa/streams/kafka/KafkaJsonMapper.java    | 52 +++++++++++++++
 .../samoa/streams/kafka/KafkaSerializer.java    | 66 +++++++++++---------
 4 files changed, 121 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/d32cea11/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
index 2c7dae1..b85ec1f 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java
@@ -1,30 +1,34 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-import org.apache.samoa.core.ContentEvent;
-
-/**
- *
- * @author pwawrzyniak
- * @param <T> the class that would be deserialized
- */
-public interface KafkaDeserializer<T extends ContentEvent> {
-    
-    // TODO: Consider key-value schema?
-    
-    T deserialize(byte[] message);
-}
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ *
+ * @author pwawrzyniak
+ * @param <T> the class that would be deserialized
+ */
+public interface KafkaDeserializer<T extends ContentEvent> {
+    
+    // TODO: Consider key-value schema?
+    /**
+     * Method that provides deserialization algorithm
+     * @param message Message as received from Apache Kafka
+     * @return Deserialized form of message, to be passed to topology
+     */
+    T deserialize(byte[] message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/d32cea11/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
index fe82212..d0a4c0d 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
@@ -85,7 +85,6 @@ public class KafkaEntranceProcessor implements 
EntranceProcessor {
     public ContentEvent nextEvent() {
         // assume this will never be called when buffer is empty!
         return this.deserializer.deserialize(buffer.remove(buffer.size() - 1));
-
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/d32cea11/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
new file mode 100644
index 0000000..6ede447
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import com.google.gson.Gson;
+import java.nio.charset.Charset;
+import org.apache.samoa.learners.InstanceContentEvent;
+
+/**
+ * Sample class for serializing and deserializing InsatnceContentEvent from/to 
JSON format
+ * @author pwawrzyniak
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ */
+public class KafkaJsonMapper implements 
KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent>{
+
+    private final transient Gson gson;
+    private final Charset charset;
+
+    /**
+     * Class constructor
+     * @param charset Charset to be used for bytes parsing
+     */
+    public KafkaJsonMapper(Charset charset){
+        this.gson = new Gson();
+        this.charset = charset;
+    }
+    
+    @Override
+    public InstanceContentEvent deserialize(byte[] message) {
+        return gson.fromJson(new String(message, this.charset), 
InstanceContentEvent.class);
+    }
+
+    @Override
+    public byte[] serialize(InstanceContentEvent message) {
+        return gson.toJson(message).getBytes(this.charset);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/d32cea11/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
index 29f04ca..a8cc0b8 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java
@@ -1,31 +1,35 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-import org.apache.samoa.core.ContentEvent;
-
-/**
- *
- * @author pwawrzyniak
- * @param <T> the class that would be serialized
- */
-public interface KafkaSerializer<T extends ContentEvent> {
-    
-    // TODO: Consider Key-Value schema?
-    
-    
-    byte[] serialize(T message);
-}
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import org.apache.samoa.core.ContentEvent;
+
+/**
+ *
+ * @author pwawrzyniak
+ * @param <T> the class that would be serialized
+ */
+public interface KafkaSerializer<T extends ContentEvent> {
+    
+    // TODO: Consider Key-Value schema?
+    
+    /**
+     * Method that provides serialization algorithm
+     * @param message Message received from topology, to be serialized
+     * @return Serialized form of the message
+     */
+    byte[] serialize(T message);
+}

Reply via email to