Sample serializer/deserializer for JSON and InstanceContentEvent Updates in comments
Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/d32cea11 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/d32cea11 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/d32cea11 Branch: refs/heads/master Commit: d32cea1156507695fdc12c9f706ad332fc2b4788 Parents: c2a589d Author: pwawrzyniak <[email protected]> Authored: Fri Mar 17 12:09:52 2017 +0100 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .../samoa/streams/kafka/KafkaDeserializer.java | 64 ++++++++++--------- .../streams/kafka/KafkaEntranceProcessor.java | 1 - .../samoa/streams/kafka/KafkaJsonMapper.java | 52 +++++++++++++++ .../samoa/streams/kafka/KafkaSerializer.java | 66 +++++++++++--------- 4 files changed, 121 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/d32cea11/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java index 2c7dae1..b85ec1f 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDeserializer.java @@ -1,30 +1,34 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -import org.apache.samoa.core.ContentEvent; - -/** - * - * @author pwawrzyniak - * @param <T> the class that would be deserialized - */ -public interface KafkaDeserializer<T extends ContentEvent> { - - // TODO: Consider key-value schema? - - T deserialize(byte[] message); -} +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import org.apache.samoa.core.ContentEvent; + +/** + * + * @author pwawrzyniak + * @param <T> the class that would be deserialized + */ +public interface KafkaDeserializer<T extends ContentEvent> { + + // TODO: Consider key-value schema? + /** + * Method that provides deserialization algorithm + * @param message Message as received from Apache Kafka + * @return Deserialized form of message, to be passed to topology + */ + T deserialize(byte[] message); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/d32cea11/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java index fe82212..d0a4c0d 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java @@ -85,7 +85,6 @@ public class KafkaEntranceProcessor implements EntranceProcessor { public ContentEvent nextEvent() { // assume this will never be called when buffer is empty! return this.deserializer.deserialize(buffer.remove(buffer.size() - 1)); - } @Override http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/d32cea11/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java new file mode 100644 index 0000000..6ede447 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaJsonMapper.java @@ -0,0 +1,52 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import com.google.gson.Gson; +import java.nio.charset.Charset; +import org.apache.samoa.learners.InstanceContentEvent; + +/** + * Sample class for serializing and deserializing InsatnceContentEvent from/to JSON format + * @author pwawrzyniak + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + */ +public class KafkaJsonMapper implements KafkaDeserializer<InstanceContentEvent>, KafkaSerializer<InstanceContentEvent>{ + + private final transient Gson gson; + private final Charset charset; + + /** + * Class constructor + * @param charset Charset to be used for bytes parsing + */ + public KafkaJsonMapper(Charset charset){ + this.gson = new Gson(); + this.charset = charset; + } + + @Override + public InstanceContentEvent deserialize(byte[] message) { + return gson.fromJson(new String(message, this.charset), InstanceContentEvent.class); + } + + @Override + public byte[] serialize(InstanceContentEvent message) { + return gson.toJson(message).getBytes(this.charset); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/d32cea11/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java index 29f04ca..a8cc0b8 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaSerializer.java @@ -1,31 +1,35 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -import org.apache.samoa.core.ContentEvent; - -/** - * - * @author pwawrzyniak - * @param <T> the class that would be serialized - */ -public interface KafkaSerializer<T extends ContentEvent> { - - // TODO: Consider Key-Value schema? - - - byte[] serialize(T message); -} +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import org.apache.samoa.core.ContentEvent; + +/** + * + * @author pwawrzyniak + * @param <T> the class that would be serialized + */ +public interface KafkaSerializer<T extends ContentEvent> { + + // TODO: Consider Key-Value schema? + + /** + * Method that provides serialization algorithm + * @param message Message received from topology, to be serialized + * @return Serialized form of the message + */ + byte[] serialize(T message); +}
