Repository: samza Updated Branches: refs/heads/master b989e51ae -> 357d6ca72
SAMZA-1435: Changed samza-api Serde implementations from Scala to Java. Author: Prateek Maheshwari <[email protected]> Reviewers: Jacob Maes <[email protected]> Closes #316 from prateekm/java-serdes Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/357d6ca7 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/357d6ca7 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/357d6ca7 Branch: refs/heads/master Commit: 357d6ca72c73908d38f92aece05fd1e468c53cbe Parents: b989e51 Author: Prateek Maheshwari <[email protected]> Authored: Fri Oct 6 10:47:37 2017 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Fri Oct 6 10:47:37 2017 -0700 ---------------------------------------------------------------------- build.gradle | 15 +-- .../samza/serializers/ByteBufferSerde.java | 46 ++++++++ .../serializers/ByteBufferSerdeFactory.java | 31 +++++ .../org/apache/samza/serializers/ByteSerde.java | 34 ++++++ .../samza/serializers/ByteSerdeFactory.java | 29 +++++ .../apache/samza/serializers/DoubleSerde.java | 45 ++++++++ .../samza/serializers/DoubleSerdeFactory.java | 29 +++++ .../apache/samza/serializers/IntegerSerde.java | 45 ++++++++ .../samza/serializers/IntegerSerdeFactory.java | 29 +++++ .../apache/samza/serializers/JsonSerdeV2.java | 115 +++++++++++++++++++ .../samza/serializers/JsonSerdeV2Factory.java | 28 +++++ .../org/apache/samza/serializers/KVSerde.java | 88 ++++++++++++++ .../org/apache/samza/serializers/LongSerde.java | 45 ++++++++ .../samza/serializers/LongSerdeFactory.java | 29 +++++ .../org/apache/samza/serializers/NoOpSerde.java | 40 +++++++ .../samza/serializers/SerializableSerde.java | 85 ++++++++++++++ .../serializers/SerializableSerdeFactory.java | 31 +++++ .../apache/samza/serializers/StringSerde.java | 64 +++++++++++ .../samza/serializers/StringSerdeFactory.java | 29 +++++ .../org/apache/samza/serializers/UUIDSerde.java | 49 ++++++++ .../samza/serializers/UUIDSerdeFactory.java | 31 +++++ .../samza/system/IncomingMessageEnvelope.java | 2 +- .../ByteBufferSerde.scala | 48 -------- .../ByteSerde.scala | 36 ------ .../DoubleSerde.scala | 45 -------- .../IntegerSerde.scala | 45 -------- .../JsonSerdeV2.scala | 91 --------------- .../org.apache.samza.serializers/KVSerde.scala | 69 ----------- .../LongSerde.scala | 45 -------- .../NoOpSerde.scala | 37 ------ .../SerializableSerde.scala | 67 ----------- .../StringSerde.scala | 49 -------- .../UUIDSerde.scala | 47 -------- 33 files changed, 926 insertions(+), 592 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index e0129f9..7ff75ec 100644 --- a/build.gradle +++ b/build.gradle @@ -88,7 +88,8 @@ rat { 'samza-test/src/main/resources/**', 'samza-hdfs/src/main/resources/**', 'samza-hdfs/src/test/resources/**', - 'out/**' + 'out/**', + 'state/**' ] } @@ -121,22 +122,12 @@ subprojects { } project(':samza-api') { - apply plugin: 'scala' apply plugin: 'checkstyle' - - // Force scala joint compilation - sourceSets.main.scala.srcDir "src/main/java" - sourceSets.test.scala.srcDir "src/test/java" - - // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting - // tasks.compileTestJava.enabled = false - sourceSets.main.java.srcDirs = [] - sourceSets.test.java.srcDirs = [] + apply plugin: 'java' dependencies { compile "org.slf4j:slf4j-api:$slf4jVersion" compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion" - compile "org.scala-lang:scala-library:$scalaLibVersion" testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-all:$mockitoVersion" } http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerde.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerde.java new file mode 100644 index 0000000..c2ea594 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerde.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import java.nio.ByteBuffer; + +/** + * A serializer for ByteBuffers. + */ +public class ByteBufferSerde implements Serde<ByteBuffer> { + + public byte[] toBytes(ByteBuffer byteBuffer) { + if (byteBuffer != null) { + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.duplicate().get(bytes); + return bytes; + } else { + return null; + } + } + + public ByteBuffer fromBytes(byte[] bytes) { + if (bytes != null) { + return ByteBuffer.wrap(bytes); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerdeFactory.java b/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerdeFactory.java new file mode 100644 index 0000000..0fcaa86 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerdeFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.config.Config; + +import java.nio.ByteBuffer; + +public class ByteBufferSerdeFactory implements SerdeFactory<ByteBuffer> { + + public Serde<ByteBuffer> getSerde(String name, Config config) { + return new ByteBufferSerde(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/ByteSerde.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/ByteSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/ByteSerde.java new file mode 100644 index 0000000..ea7156c --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/ByteSerde.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +/** + * A serializer for bytes that is effectively a pass-through, but can be useful for binary messages. + */ +public class ByteSerde implements Serde<byte[]> { + + public byte[] toBytes(byte[] bytes) { + return bytes; + } + + public byte[] fromBytes(byte[] bytes) { + return bytes; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/ByteSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/ByteSerdeFactory.java b/samza-api/src/main/java/org/apache/samza/serializers/ByteSerdeFactory.java new file mode 100644 index 0000000..5c4ad05 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/ByteSerdeFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.config.Config; + +public class ByteSerdeFactory implements SerdeFactory<byte[]> { + + public Serde<byte[]> getSerde(String name, Config config) { + return new ByteSerde(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerde.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerde.java new file mode 100644 index 0000000..39de72b --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerde.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import java.nio.ByteBuffer; + +/** + * A serializer for doubles + */ +public class DoubleSerde implements Serde<Double> { + + public byte[] toBytes(Double obj) { + if (obj != null) { + return ByteBuffer.allocate(8).putDouble(obj).array(); + } else { + return null; + } + } + + // big-endian by default + public Double fromBytes(byte[] bytes) { + if (bytes != null) { + return ByteBuffer.wrap(bytes).getDouble(); + } else { + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerdeFactory.java b/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerdeFactory.java new file mode 100644 index 0000000..18247d6 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerdeFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.config.Config; + +public class DoubleSerdeFactory implements SerdeFactory<Double> { + + public Serde<Double> getSerde(String name, Config config) { + return new DoubleSerde(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerde.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerde.java new file mode 100644 index 0000000..d2716b0 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerde.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import java.nio.ByteBuffer; + +/** + * A serializer for integers + */ +public class IntegerSerde implements Serde<Integer> { + + public byte[] toBytes(Integer obj) { + if (obj != null) { + return ByteBuffer.allocate(4).putInt(obj).array(); + } else { + return null; + } + } + + // big-endian by default + public Integer fromBytes(byte[] bytes) { + if (bytes != null) { + return ByteBuffer.wrap(bytes).getInt(); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerdeFactory.java b/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerdeFactory.java new file mode 100644 index 0000000..34f8d55 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerdeFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.config.Config; + +public class IntegerSerdeFactory implements SerdeFactory<Integer> { + + public Serde<Integer> getSerde(String name, Config config) { + return new IntegerSerde(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2.java b/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2.java new file mode 100644 index 0000000..d5b0022 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.SamzaException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.UnsupportedEncodingException; + +/** + * A serializer for UTF-8 encoded JSON strings. JsonSerdeV2 differs from JsonSerde in that: + * <ol> + * <li> + * It allows specifying the specific POJO type to deserialize to (using JsonSerdeV2(Class<T>) + * or JsonSerdeV2#of(Class<T>). JsonSerde always returns a LinkedHashMap<String, Object> + * upon deserialization. + * <li> + * It uses Jackson's default 'camelCase' property naming convention, which simplifies defining + * the POJO to bind to. JsonSerde enforces the 'dash-separated' property naming convention. + * </ol> + * This JsonSerdeV2 should be preferred over JsonSerde for High Level API applications, unless + * backwards compatibility with the older data format (with dasherized names) is required. + * + * @param <T> the type of the POJO being (de)serialized. + */ +public class JsonSerdeV2<T> implements Serde<T> { + + private static final Logger LOG = LoggerFactory.getLogger(JsonSerdeV2.class); + private final Class<T> clazz; + private transient ObjectMapper mapper = new ObjectMapper(); + + /** + * Constructs a JsonSerdeV2 that returns a LinkedHashMap<String, Object< upon deserialization. + */ + public JsonSerdeV2() { + this(null); + } + + /** + * Constructs a JsonSerdeV2 that (de)serializes POJOs of class {@code clazz}. + * + * @param clazz the class of the POJO being (de)serialized. + */ + public JsonSerdeV2(Class<T> clazz) { + this.clazz = clazz; + } + + public static <T> JsonSerdeV2<T> of(Class<T> clazz) { + return new JsonSerdeV2<>(clazz); + } + + public byte[] toBytes(T obj) { + if (obj != null) { + try { + String str = mapper.writeValueAsString(obj); + return str.getBytes("UTF-8"); + } catch (Exception e) { + throw new SamzaException("Error serializing data.", e); + } + } else { + return null; + } + } + + public T fromBytes(byte[] bytes) { + if (bytes != null) { + String str; + try { + str = new String(bytes, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw new SamzaException("Error deserializing data", e); + } + + try { + if (clazz != null) { + return mapper.readValue(str, clazz); + } else { + return mapper.readValue(str, new TypeReference<T>() { }); + } + } catch (Exception e) { + LOG.debug("Error deserializing data: " + str, e); + throw new SamzaException("Error deserializing data", e); + } + } else { + return null; + } + } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + this.mapper = new ObjectMapper(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2Factory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2Factory.java b/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2Factory.java new file mode 100644 index 0000000..2058e6b --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2Factory.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.config.Config; + +public class JsonSerdeV2Factory implements SerdeFactory<Object> { + public JsonSerdeV2<Object> getSerde(String name, Config config) { + return new JsonSerdeV2<>(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/KVSerde.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/KVSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/KVSerde.java new file mode 100644 index 0000000..10f224b --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/KVSerde.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.operators.KV; + +import java.nio.ByteBuffer; + + +/** + * A marker serde class to indicate that messages are keyed and should be deserialized as K-V pairs. This class is + * intended for use cases where a single Serde parameter or configuration is required. + * + * @param <K> type of the key in the message + * @param <V> type of the value in the message + */ +public class KVSerde<K, V> implements Serde<KV<K, V>> { + + private final Serde<K> keySerde; + private final Serde<V> valueSerde; + + public KVSerde(Serde<K> keySerde, Serde<V> valueSerde) { + this.keySerde = keySerde; + this.valueSerde = valueSerde; + } + + public static <K, V> KVSerde<K, V> of(Serde<K> keySerde, Serde<V> valueSerde) { + return new KVSerde<>(keySerde, valueSerde); + } + + public KV<K, V> fromBytes(byte[] bytes) { + if (bytes != null) { + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + int keyLength = byteBuffer.getInt(); + byte[] keyBytes = new byte[keyLength]; + byteBuffer.get(keyBytes); + int valueLength = byteBuffer.getInt(); + byte[] valueBytes = new byte[valueLength]; + byteBuffer.get(valueBytes); + K key = keySerde.fromBytes(keyBytes); + V value = valueSerde.fromBytes(valueBytes); + return KV.of(key, value); + } else { + return null; + } + } + + public byte[] toBytes(KV<K, V> obj) { + if (obj != null) { + byte[] keyBytes = keySerde.toBytes(obj.key); + byte[] valueBytes = valueSerde.toBytes(obj.value); + byte[] bytes = new byte[8 + keyBytes.length + 8 + valueBytes.length]; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + byteBuffer.putInt(keyBytes.length); + byteBuffer.put(keyBytes); + byteBuffer.putInt(valueBytes.length); + byteBuffer.put(valueBytes); + return byteBuffer.array(); + } else { + return null; + } + } + + public Serde<K> getKeySerde() { + return this.keySerde; + } + + public Serde<V> getValueSerde() { + return this.valueSerde; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/LongSerde.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/LongSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/LongSerde.java new file mode 100644 index 0000000..fb61380 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/LongSerde.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import java.nio.ByteBuffer; + +/** + * A serializer for longs + */ +public class LongSerde implements Serde<Long> { + + public byte[] toBytes(Long obj) { + if (obj != null) { + return ByteBuffer.allocate(8).putLong(obj).array(); + } else { + return null; + } + } + + // big-endian by default + public Long fromBytes(byte[] bytes) { + if (bytes != null) { + return ByteBuffer.wrap(bytes).getLong(); + } else { + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/LongSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/LongSerdeFactory.java b/samza-api/src/main/java/org/apache/samza/serializers/LongSerdeFactory.java new file mode 100644 index 0000000..e2ae811 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/LongSerdeFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.config.Config; + +public class LongSerdeFactory implements SerdeFactory<Long> { + + public LongSerde getSerde(String name, Config config) { + return new LongSerde(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/NoOpSerde.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/NoOpSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/NoOpSerde.java new file mode 100644 index 0000000..b7faef9 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/NoOpSerde.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.serializers; + +import org.apache.samza.SamzaException; + +/** + * A marker serde class to indicate that messages should not be serialized or deserialized. This is the same behavior as + * when no serde is provided, and is intended for use cases where a Serde parameter or configuration is required. This + * is different than ByteSerde which is a pass-through serde for byte arrays. + * + * @param <T> type of messages which should not be serialized or deserialized + */ +public class NoOpSerde<T> implements Serde<T> { + + public T fromBytes(byte[] bytes) { + throw new SamzaException("NoOpSerde fromBytes should not be invoked by the framework."); + } + + + public byte[] toBytes(T obj) { + throw new SamzaException("NoOpSerde toBytes should not be invoked by the framework."); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java new file mode 100644 index 0000000..d70746c --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + + +import org.apache.samza.SamzaException; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; + +/** + * A serializer for Serializable objects + */ +public class SerializableSerde<T extends Serializable> implements Serde<T> { + + public byte[] toBytes(T obj) { + if (obj != null) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = null; + try { + oos = new ObjectOutputStream(bos); + oos.writeObject(obj); + } catch (IOException e) { + throw new SamzaException("Error writing to output stream", e); + } finally { + try { + if (oos != null) { + oos.close(); + } + } catch (IOException e) { + throw new SamzaException("Error closing output stream", e); + } + } + + return bos.toByteArray(); + } else { + return null; + } + } + + public T fromBytes(byte[] bytes) { + if (bytes != null) { + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + ObjectInputStream ois = null; + + try { + ois = new ObjectInputStream(bis); + return (T) ois.readObject(); + } catch (IOException | ClassNotFoundException e) { + throw new SamzaException("Error reading from input stream."); + } finally { + try { + if (ois != null) { + ois.close(); + } + } catch (IOException e) { + throw new SamzaException("Error closing input stream", e); + } + } + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerdeFactory.java b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerdeFactory.java new file mode 100644 index 0000000..2bc78b5 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerdeFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.config.Config; + +import java.io.Serializable; + +public class SerializableSerdeFactory<T extends Serializable> implements SerdeFactory<T> { + + public Serde<T> getSerde(String name, Config config) { + return new SerializableSerde<>(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/StringSerde.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/StringSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/StringSerde.java new file mode 100644 index 0000000..c2c2874 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/StringSerde.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.SamzaException; + +import java.io.UnsupportedEncodingException; + +/** + * A serializer for strings + */ +public class StringSerde implements Serde<String> { + + private final String encoding; + + public StringSerde(String encoding) { + this.encoding = encoding; + } + + public StringSerde() { + this("UTF-8"); + } + + public byte[] toBytes(String obj) { + if (obj != null) { + try { + return obj.getBytes(encoding); + } catch (UnsupportedEncodingException e) { + throw new SamzaException("Unsupported encoding " + encoding, e); + } + } else { + return null; + } + } + + public String fromBytes(byte[] bytes) { + if (bytes != null) { + try { + return new String(bytes, 0, bytes.length, encoding); + } catch (UnsupportedEncodingException e) { + throw new SamzaException("Unsupported encoding " + encoding, e); + } + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/StringSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/StringSerdeFactory.java b/samza-api/src/main/java/org/apache/samza/serializers/StringSerdeFactory.java new file mode 100644 index 0000000..2f61fe1 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/StringSerdeFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.config.Config; + +public class StringSerdeFactory implements SerdeFactory<String> { + + public Serde<String> getSerde(String name, Config config) { + return new StringSerde(config.get("encoding", "UTF-8")); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerde.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerde.java b/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerde.java new file mode 100644 index 0000000..3cd681f --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerde.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import java.nio.ByteBuffer; +import java.util.UUID; + +/** + * A serializer for UUID + */ +public class UUIDSerde implements Serde<UUID> { + + public byte[] toBytes(UUID obj) { + if (obj != null) { + return ByteBuffer.allocate(16) + .putLong(obj.getMostSignificantBits()) + .putLong(obj.getLeastSignificantBits()) + .array(); + } else { + return null; + } + } + + public UUID fromBytes(byte[] bytes) { + if (bytes != null) { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + return new UUID(buffer.getLong(), buffer.getLong()); + } else { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerdeFactory.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerdeFactory.java b/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerdeFactory.java new file mode 100644 index 0000000..4394824 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerdeFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers; + +import org.apache.samza.config.Config; + +import java.util.UUID; + +public class UUIDSerdeFactory implements SerdeFactory<UUID> { + + public Serde<UUID> getSerde(String name, Config config) { + return new UUIDSerde(); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java index 96fa81c..60a605b 100644 --- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java +++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java @@ -22,7 +22,7 @@ package org.apache.samza.system; import java.nio.charset.Charset; /** - * This class represents a message entvelope that is received by a StreamTask for each message that is received from a + * This class represents a message envelope that is received by a StreamTask for each message that is received from a * partition of a specific input stream. */ public class IncomingMessageEnvelope { http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala ---------------------------------------------------------------------- diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala deleted file mode 100644 index adb8781..0000000 --- a/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import org.apache.samza.config.Config -import java.nio.ByteBuffer - -/** - * A serializer for ByteBuffers. - */ -class ByteBufferSerdeFactory extends SerdeFactory[ByteBuffer] { - def getSerde(name: String, config: Config): Serde[ByteBuffer] = new ByteBufferSerde -} - -class ByteBufferSerde extends Serde[ByteBuffer] { - def toBytes(byteBuffer: ByteBuffer) = { - if (byteBuffer != null) { - val bytes = new Array[Byte](byteBuffer.remaining()) - byteBuffer.duplicate().get(bytes) - bytes - } else { - null - } - } - - def fromBytes(bytes: Array[Byte]) = if (bytes != null) { - ByteBuffer.wrap(bytes) - } else { - null - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala ---------------------------------------------------------------------- diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala deleted file mode 100644 index 968da26..0000000 --- a/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import org.apache.samza.config.Config - -/** - * A serializer for bytes that is effectively a no-op but can be useful for - * binary messages. - */ -class ByteSerdeFactory extends SerdeFactory[Array[Byte]] { - def getSerde(name: String, config: Config): Serde[Array[Byte]] = new ByteSerde -} - -class ByteSerde extends Serde[Array[Byte]] { - def toBytes(bytes: Array[Byte]) = bytes - - def fromBytes(bytes: Array[Byte]) = bytes -} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala ---------------------------------------------------------------------- diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala deleted file mode 100644 index 7981d2c..0000000 --- a/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import java.nio.ByteBuffer -import org.apache.samza.config.Config - -/** - * A serializer for doubles - */ -class DoubleSerdeFactory extends SerdeFactory[java.lang.Double] { - def getSerde(name: String, config: Config): Serde[java.lang.Double] = new DoubleSerde -} - -class DoubleSerde extends Serde[java.lang.Double] { - def toBytes(obj: java.lang.Double): Array[Byte] = if (obj != null) { - ByteBuffer.allocate(8).putDouble(obj.doubleValue()).array - } else { - null - } - - // big-endian by default - def fromBytes(bytes: Array[Byte]): java.lang.Double = if (bytes != null) { - ByteBuffer.wrap(bytes).getDouble - } else { - null - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala ---------------------------------------------------------------------- diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala deleted file mode 100644 index 46509f7..0000000 --- a/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import java.nio.ByteBuffer -import org.apache.samza.config.Config - -/** - * A serializer for integers - */ -class IntegerSerdeFactory extends SerdeFactory[java.lang.Integer] { - def getSerde(name: String, config: Config): Serde[java.lang.Integer] = new IntegerSerde -} - -class IntegerSerde extends Serde[java.lang.Integer] { - def toBytes(obj: java.lang.Integer): Array[Byte] = if (obj != null) { - ByteBuffer.allocate(4).putInt(obj.intValue).array - } else { - null - } - - // big-endian by default - def fromBytes(bytes: Array[Byte]): java.lang.Integer = if (bytes != null) { - ByteBuffer.wrap(bytes).getInt - } else { - null - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala ---------------------------------------------------------------------- diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala b/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala deleted file mode 100644 index 446035c..0000000 --- a/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import org.apache.samza.SamzaException -import org.apache.samza.config.Config -import org.codehaus.jackson.`type`.TypeReference -import org.codehaus.jackson.map.ObjectMapper -import org.slf4j.LoggerFactory - - -/** - * A serializer for JSON strings. JsonSerdeV2 differs from JsonSerde in that: - * <ol> - * <li> - * It allows specifying the specific POJO type to deserialize to (using JsonSerdeV2(Class[T]) - * or JsonSerdeV2#of(Class[T]). JsonSerde always returns a LinkedHashMap<String, Object> upon deserialization. - * <li> - * It uses Jackson's default 'camelCase' property naming convention, which simplifies defining - * the POJO to bind to. JsonSerde enforces the 'dash-separated' property naming convention. - * </ol> - * This JsonSerdeV2 should be preferred over JsonSerde for High Level API applications, unless - * backwards compatibility with the older data format (with dasherized names) is required. - * - * @param clazzOption the class of the POJO being (de)serialized. If this is None, - * a LinkedHashMap<String, Object> is returned upon deserialization. - * @tparam T the type of the POJO being (de)serialized. - */ -class JsonSerdeV2[T] private(clazzOption: Option[Class[T]]) extends Serde[T] { - private val LOG = LoggerFactory.getLogger(classOf[JsonSerdeV2[T]]) - @transient lazy private val mapper = new ObjectMapper() - - def this() { - this(None) - } - - def this(clazz: Class[T]) { - this(Option(clazz)) - } - - def toBytes(obj: T): Array[Byte] = { - try { - val str = mapper.writeValueAsString(obj) - str.getBytes("UTF-8") - } catch { - case e: Exception => throw new SamzaException(e); - } - } - - def fromBytes(bytes: Array[Byte]): T = { - val str = new String(bytes, "UTF-8") - try { - clazzOption match { - case Some(clazz) => mapper.readValue(str, clazz) - case None => mapper.readValue(str, new TypeReference[T]() {}) - } - } catch { - case e: Exception => - LOG.debug(s"Error deserializing message: $str", e) - throw new SamzaException(e) - } - } - -} - -object JsonSerdeV2 { - def of[T](clazz: Class[T]): JsonSerdeV2[T] = { - new JsonSerdeV2[T](clazz) - } -} - -class JsonSerdeV2Factory extends SerdeFactory[Object] { - def getSerde(name: String, config: Config) = new JsonSerdeV2 -} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala ---------------------------------------------------------------------- diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala deleted file mode 100644 index f5cd5cf..0000000 --- a/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import java.nio.ByteBuffer - -import org.apache.samza.operators.KV - -object KVSerde { - def of[K, V](keySerde: Serde[K], valueSerde: Serde[V]) = new KVSerde[K, V](keySerde, valueSerde) -} - -/** - * A serde for [[KV]] key-value pairs. - * - * When this serde is used for streams in the High Level API, Samza wires up and uses the provided - * keySerde and valueSerde for the keys and values in the stream separately. I.e., the fromBytes and toBytes - * methods in this class aren't used directly for streams. - * - * @tparam K type of the key in the message - * @tparam V type of the value in the message - */ -class KVSerde[K, V](keySerde: Serde[K], valueSerde: Serde[V]) extends Serde[KV[K, V]] { - override def fromBytes(bytes: Array[Byte]): KV[K, V] = { - val byteBuffer = ByteBuffer.wrap(bytes) - val keyLength = byteBuffer.getInt() - val keyBytes = new Array[Byte](keyLength) - byteBuffer.get(keyBytes) - val valueLength = byteBuffer.getInt() - val valueBytes = new Array[Byte](valueLength) - byteBuffer.get(valueBytes) - val key = keySerde.fromBytes(keyBytes) - val value = valueSerde.fromBytes(valueBytes) - KV.of(key, value) - } - - override def toBytes(obj: KV[K, V]): Array[Byte] = { - val keyBytes = keySerde.toBytes(obj.key) - val valueBytes = valueSerde.toBytes(obj.value) - val bytes = new Array[Byte](8 + keyBytes.length + 8 + valueBytes.length) - val byteBuffer = ByteBuffer.wrap(bytes) - byteBuffer.putInt(keyBytes.length) - byteBuffer.put(keyBytes) - byteBuffer.putInt(valueBytes.length) - byteBuffer.put(valueBytes) - byteBuffer.array() - } - - def getKeySerde: Serde[K] = keySerde - - def getValueSerde: Serde[V] = valueSerde -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala ---------------------------------------------------------------------- diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala deleted file mode 100644 index 41ff598..0000000 --- a/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import java.nio.ByteBuffer -import org.apache.samza.config.Config - -/** - * A serializer for longs - */ -class LongSerdeFactory extends SerdeFactory[java.lang.Long] { - def getSerde(name: String, config: Config): Serde[java.lang.Long] = new LongSerde -} - -class LongSerde extends Serde[java.lang.Long] { - def toBytes(obj: java.lang.Long): Array[Byte] = if (obj != null) { - ByteBuffer.allocate(8).putLong(obj.longValue()).array - } else { - null - } - - // big-endian by default - def fromBytes(bytes: Array[Byte]): java.lang.Long = if (bytes != null) { - ByteBuffer.wrap(bytes).getLong - } else { - null - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala ---------------------------------------------------------------------- diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala deleted file mode 100644 index c656526..0000000 --- a/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.serializers - -/** - * A marker serde class to indicate that messages should not be serialized or deserialized. - * This is the same behavior as when no serde is provided, and is intended for use cases where - * a Serde parameter or configuration is required. - * This is different than [[ByteSerde]] which is a pass-through serde for byte arrays. - * - * @tparam T type of messages which should not be serialized or deserialized - */ -class NoOpSerde[T] extends Serde[T] { - - override def fromBytes(bytes: Array[Byte]): T = - throw new NotImplementedError("NoOpSerde fromBytes should not be invoked by the framework.") - - override def toBytes(obj: T): Array[Byte] = - throw new NotImplementedError("NoOpSerde toBytes should not be invoked by the framework.") - -} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala ---------------------------------------------------------------------- diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala deleted file mode 100644 index c43f863..0000000 --- a/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import java.io.ByteArrayInputStream -import java.io.ByteArrayOutputStream -import java.io.ObjectInputStream -import java.io.ObjectOutputStream - -import org.apache.samza.config.Config - -/** - * A serializer for Serializable - */ -class SerializableSerdeFactory[T <: java.io.Serializable] extends SerdeFactory[T] { - def getSerde(name: String, config: Config): Serde[T] = - new SerializableSerde[T] -} - -class SerializableSerde[T <: java.io.Serializable] extends Serde[T] { - def toBytes(obj: T): Array[Byte] = if (obj != null) { - val bos = new ByteArrayOutputStream - val oos = new ObjectOutputStream(bos) - - try { - oos.writeObject(obj) - } - finally { - oos.close() - } - - bos.toByteArray - } else { - null - } - - def fromBytes(bytes: Array[Byte]): T = if (bytes != null) { - val bis = new ByteArrayInputStream(bytes) - val ois = new ObjectInputStream(bis) - - try { - ois.readObject.asInstanceOf[T] - } - finally{ - ois.close() - } - } else { - null.asInstanceOf[T] - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala ---------------------------------------------------------------------- diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala deleted file mode 100644 index c69c402..0000000 --- a/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import org.apache.samza.config.Config - -/** - * A serializer for strings - */ -class StringSerdeFactory extends SerdeFactory[String] { - def getSerde(name: String, config: Config): Serde[String] = - new StringSerde(config.get("encoding", "UTF-8")) -} - -class StringSerde(val encoding: String) extends Serde[String] { - // constructor (for Java) that defaults to UTF-8 encoding - def this() { - this("UTF-8") - } - - def toBytes(obj: String): Array[Byte] = if (obj != null) { - obj.toString.getBytes(encoding) - } else { - null - } - - def fromBytes(bytes: Array[Byte]): String = if (bytes != null) { - new String(bytes, 0, bytes.size, encoding) - } else { - null - } -} http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala ---------------------------------------------------------------------- diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala deleted file mode 100644 index 88d4327..0000000 --- a/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import java.nio.ByteBuffer -import java.util.UUID - -import org.apache.samza.config.Config - -/** - * A serializer for UUID - */ -class UUIDSerdeFactory extends SerdeFactory[UUID] { - def getSerde(name: String, config: Config): Serde[UUID] = new UUIDSerde -} - -class UUIDSerde() extends Serde[UUID] { - def toBytes(obj: UUID): Array[Byte] = if (obj != null) { - ByteBuffer.allocate(16).putLong(obj.getMostSignificantBits).putLong(obj.getLeastSignificantBits).array - } else { - null - } - - def fromBytes(bytes: Array[Byte]): UUID = if (bytes != null) { - val buffer = ByteBuffer.wrap(bytes) - new UUID(buffer.getLong, buffer.getLong) - } else { - null - } -}
