Repository: samza
Updated Branches:
  refs/heads/master b989e51ae -> 357d6ca72


SAMZA-1435: Changed samza-api Serde implementations from Scala to Java.

Author: Prateek Maheshwari <[email protected]>

Reviewers: Jacob Maes <[email protected]>

Closes #316 from prateekm/java-serdes


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/357d6ca7
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/357d6ca7
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/357d6ca7

Branch: refs/heads/master
Commit: 357d6ca72c73908d38f92aece05fd1e468c53cbe
Parents: b989e51
Author: Prateek Maheshwari <[email protected]>
Authored: Fri Oct 6 10:47:37 2017 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Fri Oct 6 10:47:37 2017 -0700

----------------------------------------------------------------------
 build.gradle                                    |  15 +--
 .../samza/serializers/ByteBufferSerde.java      |  46 ++++++++
 .../serializers/ByteBufferSerdeFactory.java     |  31 +++++
 .../org/apache/samza/serializers/ByteSerde.java |  34 ++++++
 .../samza/serializers/ByteSerdeFactory.java     |  29 +++++
 .../apache/samza/serializers/DoubleSerde.java   |  45 ++++++++
 .../samza/serializers/DoubleSerdeFactory.java   |  29 +++++
 .../apache/samza/serializers/IntegerSerde.java  |  45 ++++++++
 .../samza/serializers/IntegerSerdeFactory.java  |  29 +++++
 .../apache/samza/serializers/JsonSerdeV2.java   | 115 +++++++++++++++++++
 .../samza/serializers/JsonSerdeV2Factory.java   |  28 +++++
 .../org/apache/samza/serializers/KVSerde.java   |  88 ++++++++++++++
 .../org/apache/samza/serializers/LongSerde.java |  45 ++++++++
 .../samza/serializers/LongSerdeFactory.java     |  29 +++++
 .../org/apache/samza/serializers/NoOpSerde.java |  40 +++++++
 .../samza/serializers/SerializableSerde.java    |  85 ++++++++++++++
 .../serializers/SerializableSerdeFactory.java   |  31 +++++
 .../apache/samza/serializers/StringSerde.java   |  64 +++++++++++
 .../samza/serializers/StringSerdeFactory.java   |  29 +++++
 .../org/apache/samza/serializers/UUIDSerde.java |  49 ++++++++
 .../samza/serializers/UUIDSerdeFactory.java     |  31 +++++
 .../samza/system/IncomingMessageEnvelope.java   |   2 +-
 .../ByteBufferSerde.scala                       |  48 --------
 .../ByteSerde.scala                             |  36 ------
 .../DoubleSerde.scala                           |  45 --------
 .../IntegerSerde.scala                          |  45 --------
 .../JsonSerdeV2.scala                           |  91 ---------------
 .../org.apache.samza.serializers/KVSerde.scala  |  69 -----------
 .../LongSerde.scala                             |  45 --------
 .../NoOpSerde.scala                             |  37 ------
 .../SerializableSerde.scala                     |  67 -----------
 .../StringSerde.scala                           |  49 --------
 .../UUIDSerde.scala                             |  47 --------
 33 files changed, 926 insertions(+), 592 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index e0129f9..7ff75ec 100644
--- a/build.gradle
+++ b/build.gradle
@@ -88,7 +88,8 @@ rat {
     'samza-test/src/main/resources/**',
     'samza-hdfs/src/main/resources/**',
     'samza-hdfs/src/test/resources/**',
-    'out/**'
+    'out/**',
+    'state/**'
   ]
 }
 
@@ -121,22 +122,12 @@ subprojects {
 }
 
 project(':samza-api') {
-  apply plugin: 'scala'
   apply plugin: 'checkstyle'
-
-  // Force scala joint compilation
-  sourceSets.main.scala.srcDir "src/main/java"
-  sourceSets.test.scala.srcDir "src/test/java"
-
-  // Disable the Javac compiler by forcing joint compilation by scalac. This 
is equivalent to setting
-  // tasks.compileTestJava.enabled = false
-  sourceSets.main.java.srcDirs = []
-  sourceSets.test.java.srcDirs = []
+  apply plugin: 'java'
 
   dependencies {
     compile "org.slf4j:slf4j-api:$slf4jVersion"
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
-    compile "org.scala-lang:scala-library:$scalaLibVersion"
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerde.java 
b/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerde.java
new file mode 100644
index 0000000..c2ea594
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerde.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A serializer for ByteBuffers.
+ */
+public class ByteBufferSerde implements Serde<ByteBuffer> {
+
+  public byte[] toBytes(ByteBuffer byteBuffer) {
+    if (byteBuffer != null) {
+      byte[] bytes = new byte[byteBuffer.remaining()];
+      byteBuffer.duplicate().get(bytes);
+      return bytes;
+    } else {
+      return null;
+    }
+  }
+
+  public ByteBuffer fromBytes(byte[] bytes) {
+    if (bytes != null) {
+      return ByteBuffer.wrap(bytes);
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerdeFactory.java
 
b/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerdeFactory.java
new file mode 100644
index 0000000..0fcaa86
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/serializers/ByteBufferSerdeFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.config.Config;
+
+import java.nio.ByteBuffer;
+
+public class ByteBufferSerdeFactory implements SerdeFactory<ByteBuffer> {
+
+  public Serde<ByteBuffer> getSerde(String name, Config config) {
+    return new ByteBufferSerde();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/ByteSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/ByteSerde.java 
b/samza-api/src/main/java/org/apache/samza/serializers/ByteSerde.java
new file mode 100644
index 0000000..ea7156c
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/ByteSerde.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+/**
+ * A serializer for bytes that is effectively a pass-through, but can be 
useful for binary messages.
+ */
+public class ByteSerde implements Serde<byte[]> {
+
+  public byte[] toBytes(byte[] bytes) {
+    return bytes;
+  }
+
+  public byte[] fromBytes(byte[] bytes) {
+    return bytes;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/ByteSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/ByteSerdeFactory.java 
b/samza-api/src/main/java/org/apache/samza/serializers/ByteSerdeFactory.java
new file mode 100644
index 0000000..5c4ad05
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/ByteSerdeFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.config.Config;
+
+public class ByteSerdeFactory implements SerdeFactory<byte[]> {
+
+  public Serde<byte[]> getSerde(String name, Config config) {
+    return new ByteSerde();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerde.java 
b/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerde.java
new file mode 100644
index 0000000..39de72b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerde.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A serializer for doubles
+ */
+public class DoubleSerde implements Serde<Double> {
+
+  public byte[] toBytes(Double obj) {
+    if (obj != null) {
+      return ByteBuffer.allocate(8).putDouble(obj).array();
+    } else {
+      return null;
+    }
+  }
+
+  // big-endian by default
+  public Double fromBytes(byte[] bytes) {
+    if (bytes != null) {
+      return ByteBuffer.wrap(bytes).getDouble();
+    } else {
+      return null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerdeFactory.java 
b/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerdeFactory.java
new file mode 100644
index 0000000..18247d6
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/serializers/DoubleSerdeFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.config.Config;
+
+public class DoubleSerdeFactory implements SerdeFactory<Double> {
+
+  public Serde<Double> getSerde(String name, Config config) {
+    return new DoubleSerde();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerde.java 
b/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerde.java
new file mode 100644
index 0000000..d2716b0
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerde.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A serializer for integers
+ */
+public class IntegerSerde implements Serde<Integer> {
+
+  public byte[] toBytes(Integer obj) {
+    if (obj != null) {
+      return ByteBuffer.allocate(4).putInt(obj).array();
+    } else {
+      return null;
+    }
+  }
+
+  // big-endian by default
+  public Integer fromBytes(byte[] bytes) {
+    if (bytes != null) {
+      return ByteBuffer.wrap(bytes).getInt();
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerdeFactory.java 
b/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerdeFactory.java
new file mode 100644
index 0000000..34f8d55
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/serializers/IntegerSerdeFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.config.Config;
+
+public class IntegerSerdeFactory implements SerdeFactory<Integer> {
+
+  public Serde<Integer> getSerde(String name, Config config) {
+    return new IntegerSerde();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2.java 
b/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2.java
new file mode 100644
index 0000000..d5b0022
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.SamzaException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.UnsupportedEncodingException;
+
+/**
+ * A serializer for UTF-8 encoded JSON strings. JsonSerdeV2 differs from 
JsonSerde in that:
+ * <ol>
+ *   <li>
+ *     It allows specifying the specific POJO type to deserialize to (using 
JsonSerdeV2(Class&lt;T&gt;)
+ *     or JsonSerdeV2#of(Class&lt;T&gt;). JsonSerde always returns a 
LinkedHashMap&lt;String, Object&gt;
+ *     upon deserialization.
+ *   <li>
+ *     It uses Jackson's default 'camelCase' property naming convention, which 
simplifies defining
+ *     the POJO to bind to. JsonSerde enforces the 'dash-separated' property 
naming convention.
+ * </ol>
+ * This JsonSerdeV2 should be preferred over JsonSerde for High Level API 
applications, unless
+ * backwards compatibility with the older data format (with dasherized names) 
is required.
+ *
+ * @param <T> the type of the POJO being (de)serialized.
+ */
+public class JsonSerdeV2<T> implements Serde<T> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(JsonSerdeV2.class);
+  private final Class<T> clazz;
+  private transient ObjectMapper mapper = new ObjectMapper();
+
+  /**
+   * Constructs a JsonSerdeV2 that returns a LinkedHashMap&lt;String, 
Object&lt; upon deserialization.
+   */
+  public JsonSerdeV2() {
+    this(null);
+  }
+
+  /**
+   * Constructs a JsonSerdeV2 that (de)serializes POJOs of class {@code clazz}.
+   *
+   * @param clazz the class of the POJO being (de)serialized.
+   */
+  public JsonSerdeV2(Class<T> clazz) {
+    this.clazz = clazz;
+  }
+
+  public static <T> JsonSerdeV2<T> of(Class<T> clazz) {
+    return new JsonSerdeV2<>(clazz);
+  }
+
+  public byte[] toBytes(T obj) {
+    if (obj != null) {
+      try {
+        String str = mapper.writeValueAsString(obj);
+        return str.getBytes("UTF-8");
+      } catch (Exception e) {
+        throw new SamzaException("Error serializing data.", e);
+      }
+    } else {
+      return null;
+    }
+  }
+
+  public T fromBytes(byte[] bytes) {
+    if (bytes != null) {
+      String str;
+      try {
+        str = new String(bytes, "UTF-8");
+      } catch (UnsupportedEncodingException e) {
+        throw new SamzaException("Error deserializing data", e);
+      }
+
+      try {
+        if (clazz != null) {
+          return mapper.readValue(str, clazz);
+        } else {
+          return mapper.readValue(str, new TypeReference<T>() { });
+        }
+      } catch (Exception e) {
+        LOG.debug("Error deserializing data: " + str, e);
+        throw new SamzaException("Error deserializing data", e);
+      }
+    } else {
+      return null;
+    }
+  }
+
+  private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+    in.defaultReadObject();
+    this.mapper = new ObjectMapper();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2Factory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2Factory.java 
b/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2Factory.java
new file mode 100644
index 0000000..2058e6b
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/serializers/JsonSerdeV2Factory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.config.Config;
+
+public class JsonSerdeV2Factory implements SerdeFactory<Object> {
+  public JsonSerdeV2<Object> getSerde(String name, Config config) {
+    return new JsonSerdeV2<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/KVSerde.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/serializers/KVSerde.java 
b/samza-api/src/main/java/org/apache/samza/serializers/KVSerde.java
new file mode 100644
index 0000000..10f224b
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/KVSerde.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.operators.KV;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * A marker serde class to indicate that messages are keyed and should be 
deserialized as K-V pairs. This class is
+ * intended for use cases where a single Serde parameter or configuration is 
required.
+ *
+ * @param <K> type of the key in the message
+ * @param <V> type of the value in the message
+ */
+public class KVSerde<K, V> implements Serde<KV<K, V>> {
+
+  private final Serde<K> keySerde;
+  private final Serde<V> valueSerde;
+
+  public KVSerde(Serde<K> keySerde, Serde<V> valueSerde) {
+    this.keySerde = keySerde;
+    this.valueSerde = valueSerde;
+  }
+
+  public static <K, V> KVSerde<K, V> of(Serde<K> keySerde, Serde<V> 
valueSerde) {
+    return new KVSerde<>(keySerde, valueSerde);
+  }
+
+  public KV<K, V> fromBytes(byte[] bytes) {
+    if (bytes != null) {
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      int keyLength = byteBuffer.getInt();
+      byte[] keyBytes = new byte[keyLength];
+      byteBuffer.get(keyBytes);
+      int valueLength = byteBuffer.getInt();
+      byte[] valueBytes = new byte[valueLength];
+      byteBuffer.get(valueBytes);
+      K key = keySerde.fromBytes(keyBytes);
+      V value = valueSerde.fromBytes(valueBytes);
+      return KV.of(key, value);
+    } else {
+      return null;
+    }
+  }
+
+  public byte[] toBytes(KV<K, V> obj) {
+    if (obj != null) {
+      byte[] keyBytes = keySerde.toBytes(obj.key);
+      byte[] valueBytes = valueSerde.toBytes(obj.value);
+      byte[] bytes = new byte[8 + keyBytes.length + 8 + valueBytes.length];
+      ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+      byteBuffer.putInt(keyBytes.length);
+      byteBuffer.put(keyBytes);
+      byteBuffer.putInt(valueBytes.length);
+      byteBuffer.put(valueBytes);
+      return byteBuffer.array();
+    } else {
+      return null;
+    }
+  }
+
+  public Serde<K> getKeySerde() {
+    return this.keySerde;
+  }
+
+  public Serde<V> getValueSerde() {
+    return this.valueSerde;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/LongSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/LongSerde.java 
b/samza-api/src/main/java/org/apache/samza/serializers/LongSerde.java
new file mode 100644
index 0000000..fb61380
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/LongSerde.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A serializer for longs
+ */
+public class LongSerde implements Serde<Long> {
+
+  public byte[] toBytes(Long obj) {
+    if (obj != null) {
+      return ByteBuffer.allocate(8).putLong(obj).array();
+    } else {
+      return null;
+    }
+  }
+
+  // big-endian by default
+  public Long fromBytes(byte[] bytes) {
+    if (bytes != null) {
+      return ByteBuffer.wrap(bytes).getLong();
+    } else {
+      return null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/LongSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/LongSerdeFactory.java 
b/samza-api/src/main/java/org/apache/samza/serializers/LongSerdeFactory.java
new file mode 100644
index 0000000..e2ae811
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/LongSerdeFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.config.Config;
+
+public class LongSerdeFactory implements SerdeFactory<Long> {
+
+  public LongSerde getSerde(String name, Config config) {
+    return new LongSerde();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/NoOpSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/NoOpSerde.java 
b/samza-api/src/main/java/org/apache/samza/serializers/NoOpSerde.java
new file mode 100644
index 0000000..b7faef9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/NoOpSerde.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.serializers;
+
+import org.apache.samza.SamzaException;
+
+/**
+ * A marker serde class to indicate that messages should not be serialized or 
deserialized. This is the same behavior as
+ * when no serde is provided, and is intended for use cases where a Serde 
parameter or configuration is required. This
+ * is different than ByteSerde which is a pass-through serde for byte arrays.
+ *
+ * @param <T> type of messages which should not be serialized or deserialized
+ */
+public class NoOpSerde<T> implements Serde<T> {
+
+  public T fromBytes(byte[] bytes) {
+    throw new SamzaException("NoOpSerde fromBytes should not be invoked by the 
framework.");
+  }
+
+
+  public byte[] toBytes(T obj) {
+    throw new SamzaException("NoOpSerde toBytes should not be invoked by the 
framework.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java 
b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java
new file mode 100644
index 0000000..d70746c
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerde.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+
+import org.apache.samza.SamzaException;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+/**
+ * A serializer for Serializable objects
+ */
+public class SerializableSerde<T extends Serializable> implements Serde<T> {
+
+  public byte[] toBytes(T obj) {
+    if (obj != null) {
+      ByteArrayOutputStream bos = new ByteArrayOutputStream();
+      ObjectOutputStream oos = null;
+      try {
+        oos = new ObjectOutputStream(bos);
+        oos.writeObject(obj);
+      } catch (IOException e) {
+        throw new SamzaException("Error writing to output stream", e);
+      } finally {
+        try {
+          if (oos != null) {
+            oos.close();
+          }
+        } catch (IOException e) {
+          throw new SamzaException("Error closing output stream", e);
+        }
+      }
+
+      return bos.toByteArray();
+    } else {
+      return null;
+    }
+  }
+
+  public T fromBytes(byte[] bytes) {
+    if (bytes != null) {
+      ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+      ObjectInputStream ois = null;
+
+      try {
+        ois = new ObjectInputStream(bis);
+        return (T) ois.readObject();
+      } catch (IOException | ClassNotFoundException e) {
+        throw new SamzaException("Error reading from input stream.");
+      } finally {
+        try {
+          if (ois != null) {
+            ois.close();
+          }
+        } catch (IOException e) {
+          throw new SamzaException("Error closing input stream", e);
+        }
+      }
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerdeFactory.java
 
b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerdeFactory.java
new file mode 100644
index 0000000..2bc78b5
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/serializers/SerializableSerdeFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.config.Config;
+
+import java.io.Serializable;
+
+public class SerializableSerdeFactory<T extends Serializable> implements 
SerdeFactory<T> {
+
+  public Serde<T> getSerde(String name, Config config) {
+    return new SerializableSerde<>();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/StringSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/StringSerde.java 
b/samza-api/src/main/java/org/apache/samza/serializers/StringSerde.java
new file mode 100644
index 0000000..c2c2874
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/StringSerde.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.SamzaException;
+
+import java.io.UnsupportedEncodingException;
+
+/**
+ * A serializer for strings
+ */
+public class StringSerde implements Serde<String> {
+
+  private final String encoding;
+
+  public StringSerde(String encoding) {
+    this.encoding = encoding;
+  }
+
+  public StringSerde() {
+    this("UTF-8");
+  }
+
+  public byte[] toBytes(String obj) {
+    if (obj != null) {
+      try {
+        return obj.getBytes(encoding);
+      } catch (UnsupportedEncodingException e) {
+        throw new SamzaException("Unsupported encoding " + encoding, e);
+      }
+    } else {
+      return null;
+    }
+  }
+
+  public String fromBytes(byte[] bytes) {
+    if (bytes != null) {
+      try {
+        return new String(bytes, 0, bytes.length, encoding);
+      } catch (UnsupportedEncodingException e) {
+        throw new SamzaException("Unsupported encoding " + encoding, e);
+      }
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/StringSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/StringSerdeFactory.java 
b/samza-api/src/main/java/org/apache/samza/serializers/StringSerdeFactory.java
new file mode 100644
index 0000000..2f61fe1
--- /dev/null
+++ 
b/samza-api/src/main/java/org/apache/samza/serializers/StringSerdeFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.config.Config;
+
+public class StringSerdeFactory implements SerdeFactory<String> {
+
+  public Serde<String> getSerde(String name, Config config) {
+    return new StringSerde(config.get("encoding", "UTF-8"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerde.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerde.java 
b/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerde.java
new file mode 100644
index 0000000..3cd681f
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerde.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+/**
+ * A serializer for UUID
+ */
+public class UUIDSerde implements Serde<UUID> {
+
+  public byte[] toBytes(UUID obj) {
+    if (obj != null) {
+      return ByteBuffer.allocate(16)
+          .putLong(obj.getMostSignificantBits())
+          .putLong(obj.getLeastSignificantBits())
+          .array();
+    } else {
+      return null;
+    }
+  }
+
+  public UUID fromBytes(byte[] bytes) {
+    if (bytes != null) {
+      ByteBuffer buffer = ByteBuffer.wrap(bytes);
+      return new UUID(buffer.getLong(), buffer.getLong());
+    } else {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerdeFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerdeFactory.java 
b/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerdeFactory.java
new file mode 100644
index 0000000..4394824
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/serializers/UUIDSerdeFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers;
+
+import org.apache.samza.config.Config;
+
+import java.util.UUID;
+
+public class UUIDSerdeFactory implements SerdeFactory<UUID> {
+
+  public Serde<UUID> getSerde(String name, Config config) {
+    return new UUIDSerde();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java 
b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 96fa81c..60a605b 100644
--- 
a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ 
b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -22,7 +22,7 @@ package org.apache.samza.system;
 import java.nio.charset.Charset;
 
 /**
- * This class represents a message entvelope that is received by a StreamTask 
for each message that is received from a
+ * This class represents a message envelope that is received by a StreamTask 
for each message that is received from a
  * partition of a specific input stream.
  */
 public class IncomingMessageEnvelope {

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala 
b/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala
deleted file mode 100644
index adb8781..0000000
--- 
a/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.apache.samza.config.Config
-import java.nio.ByteBuffer
-
-/**
- * A serializer for ByteBuffers.
- */
-class ByteBufferSerdeFactory extends SerdeFactory[ByteBuffer] {
-  def getSerde(name: String, config: Config): Serde[ByteBuffer] = new 
ByteBufferSerde
-}
-
-class ByteBufferSerde extends Serde[ByteBuffer] {
-  def toBytes(byteBuffer: ByteBuffer) = {
-    if (byteBuffer != null) {
-      val bytes = new Array[Byte](byteBuffer.remaining())
-      byteBuffer.duplicate().get(bytes)
-      bytes
-    } else {
-      null
-    }
-  }
-
-  def fromBytes(bytes: Array[Byte]) = if (bytes != null) {
-    ByteBuffer.wrap(bytes)
-  } else {
-    null
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala 
b/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala
deleted file mode 100644
index 968da26..0000000
--- a/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.apache.samza.config.Config
-
-/**
- * A serializer for bytes that is effectively a no-op but can be useful for 
- * binary messages.
- */
-class ByteSerdeFactory extends SerdeFactory[Array[Byte]] {
-  def getSerde(name: String, config: Config): Serde[Array[Byte]] = new 
ByteSerde
-}
-
-class ByteSerde extends Serde[Array[Byte]] {
-  def toBytes(bytes: Array[Byte]) = bytes
-
-  def fromBytes(bytes: Array[Byte]) = bytes
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala 
b/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala
deleted file mode 100644
index 7981d2c..0000000
--- a/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-import org.apache.samza.config.Config
-
-/**
- * A serializer for doubles
- */
-class DoubleSerdeFactory extends SerdeFactory[java.lang.Double] {
-  def getSerde(name: String, config: Config): Serde[java.lang.Double] = new 
DoubleSerde
-}
-
-class DoubleSerde extends Serde[java.lang.Double] {
-  def toBytes(obj: java.lang.Double): Array[Byte] = if (obj != null) {
-    ByteBuffer.allocate(8).putDouble(obj.doubleValue()).array
-  } else {
-    null
-  }
-
-  // big-endian by default
-  def fromBytes(bytes: Array[Byte]): java.lang.Double = if (bytes != null) {
-    ByteBuffer.wrap(bytes).getDouble
-  } else {
-    null
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala 
b/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala
deleted file mode 100644
index 46509f7..0000000
--- a/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-import org.apache.samza.config.Config
-
-/**
- * A serializer for integers
- */
-class IntegerSerdeFactory extends SerdeFactory[java.lang.Integer] {
-  def getSerde(name: String, config: Config): Serde[java.lang.Integer] = new 
IntegerSerde
-}
-
-class IntegerSerde extends Serde[java.lang.Integer] {
-  def toBytes(obj: java.lang.Integer): Array[Byte] = if (obj != null) {
-    ByteBuffer.allocate(4).putInt(obj.intValue).array
-  } else {
-    null
-  }
-
-  // big-endian by default
-  def fromBytes(bytes: Array[Byte]): java.lang.Integer = if (bytes != null) {
-    ByteBuffer.wrap(bytes).getInt
-  } else {
-    null
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala 
b/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala
deleted file mode 100644
index 446035c..0000000
--- a/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.apache.samza.SamzaException
-import org.apache.samza.config.Config
-import org.codehaus.jackson.`type`.TypeReference
-import org.codehaus.jackson.map.ObjectMapper
-import org.slf4j.LoggerFactory
-
-
-/**
-  * A serializer for JSON strings. JsonSerdeV2 differs from JsonSerde in that:
-  * <ol>
-  *   <li>
-  *     It allows specifying the specific POJO type to deserialize to (using 
JsonSerdeV2(Class[T])
-  *     or JsonSerdeV2#of(Class[T]). JsonSerde always returns a 
LinkedHashMap<String, Object> upon deserialization.
-  *   <li>
-  *     It uses Jackson's default 'camelCase' property naming convention, 
which simplifies defining
-  *     the POJO to bind to. JsonSerde enforces the 'dash-separated' property 
naming convention.
-  * </ol>
-  * This JsonSerdeV2 should be preferred over JsonSerde for High Level API 
applications, unless
-  * backwards compatibility with the older data format (with dasherized names) 
is required.
-  *
-  * @param clazzOption the class of the POJO being (de)serialized. If this is 
None,
-  *                    a LinkedHashMap<String, Object> is returned upon 
deserialization.
-  * @tparam T the type of the POJO being (de)serialized.
-  */
-class JsonSerdeV2[T] private(clazzOption: Option[Class[T]]) extends Serde[T] {
-  private val LOG = LoggerFactory.getLogger(classOf[JsonSerdeV2[T]])
-  @transient lazy private val mapper = new ObjectMapper()
-
-  def this() {
-    this(None)
-  }
-
-  def this(clazz: Class[T]) {
-    this(Option(clazz))
-  }
-
-  def toBytes(obj: T): Array[Byte] = {
-    try {
-      val str = mapper.writeValueAsString(obj)
-      str.getBytes("UTF-8")
-    } catch {
-      case e: Exception => throw new SamzaException(e);
-    }
-  }
-
-  def fromBytes(bytes: Array[Byte]): T = {
-    val str = new String(bytes, "UTF-8")
-     try {
-       clazzOption match {
-         case Some(clazz) => mapper.readValue(str, clazz)
-         case None => mapper.readValue(str, new TypeReference[T]() {})
-       }
-     } catch {
-       case e: Exception =>
-         LOG.debug(s"Error deserializing message: $str", e)
-         throw new SamzaException(e)
-     }
-  }
-
-}
-
-object JsonSerdeV2 {
-  def of[T](clazz: Class[T]): JsonSerdeV2[T] = {
-    new JsonSerdeV2[T](clazz)
-  }
-}
-
-class JsonSerdeV2Factory extends SerdeFactory[Object] {
-  def getSerde(name: String, config: Config) = new JsonSerdeV2
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala 
b/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
deleted file mode 100644
index f5cd5cf..0000000
--- a/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-
-import org.apache.samza.operators.KV
-
-object KVSerde {
-  def of[K, V](keySerde: Serde[K], valueSerde: Serde[V]) = new KVSerde[K, 
V](keySerde, valueSerde)
-}
-
-/**
-  * A serde for [[KV]] key-value pairs.
-  *
-  * When this serde is used for streams in the High Level API, Samza wires up 
and uses the provided
-  * keySerde and valueSerde for the keys and values in the stream separately. 
I.e., the fromBytes and toBytes
-  * methods in this class aren't used directly for streams.
-  *
-  * @tparam K type of the key in the message
-  * @tparam V type of the value in the message
-  */
-class KVSerde[K, V](keySerde: Serde[K], valueSerde: Serde[V]) extends 
Serde[KV[K, V]] {
-  override def fromBytes(bytes: Array[Byte]): KV[K, V] = {
-    val byteBuffer = ByteBuffer.wrap(bytes)
-    val keyLength = byteBuffer.getInt()
-    val keyBytes = new Array[Byte](keyLength)
-    byteBuffer.get(keyBytes)
-    val valueLength = byteBuffer.getInt()
-    val valueBytes = new Array[Byte](valueLength)
-    byteBuffer.get(valueBytes)
-    val key = keySerde.fromBytes(keyBytes)
-    val value = valueSerde.fromBytes(valueBytes)
-    KV.of(key, value)
-  }
-
-  override def toBytes(obj: KV[K, V]): Array[Byte] = {
-    val keyBytes = keySerde.toBytes(obj.key)
-    val valueBytes = valueSerde.toBytes(obj.value)
-    val bytes = new Array[Byte](8 + keyBytes.length + 8 + valueBytes.length)
-    val byteBuffer = ByteBuffer.wrap(bytes)
-    byteBuffer.putInt(keyBytes.length)
-    byteBuffer.put(keyBytes)
-    byteBuffer.putInt(valueBytes.length)
-    byteBuffer.put(valueBytes)
-    byteBuffer.array()
-  }
-
-  def getKeySerde: Serde[K] = keySerde
-
-  def getValueSerde: Serde[V] = valueSerde
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala 
b/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala
deleted file mode 100644
index 41ff598..0000000
--- a/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-import org.apache.samza.config.Config
-
-/**
- * A serializer for longs
- */
-class LongSerdeFactory extends SerdeFactory[java.lang.Long] {
-  def getSerde(name: String, config: Config): Serde[java.lang.Long] = new 
LongSerde
-}
-
-class LongSerde extends Serde[java.lang.Long] {
-  def toBytes(obj: java.lang.Long): Array[Byte] = if (obj != null) {
-    ByteBuffer.allocate(8).putLong(obj.longValue()).array
-  } else {
-    null
-  }
-
-  // big-endian by default
-  def fromBytes(bytes: Array[Byte]): java.lang.Long = if (bytes != null) {
-    ByteBuffer.wrap(bytes).getLong
-  } else {
-    null
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala 
b/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala
deleted file mode 100644
index c656526..0000000
--- a/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.serializers
-
-/**
-  * A marker serde class to indicate that messages should not be serialized or 
deserialized.
-  * This is the same behavior as when no serde is provided, and is intended 
for use cases where
-  * a Serde parameter or configuration is required.
-  * This is different than [[ByteSerde]] which is a pass-through serde for 
byte arrays.
-  *
-  * @tparam T type of messages which should not be serialized or deserialized
-  */
-class NoOpSerde[T] extends Serde[T] {
-
-  override def fromBytes(bytes: Array[Byte]): T =
-    throw new NotImplementedError("NoOpSerde fromBytes should not be invoked 
by the framework.")
-
-  override def toBytes(obj: T): Array[Byte] =
-    throw new NotImplementedError("NoOpSerde toBytes should not be invoked by 
the framework.")
-
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala 
b/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala
deleted file mode 100644
index c43f863..0000000
--- 
a/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.io.ByteArrayInputStream
-import java.io.ByteArrayOutputStream
-import java.io.ObjectInputStream
-import java.io.ObjectOutputStream
-
-import org.apache.samza.config.Config
-
-/**
- * A serializer for Serializable
- */
-class SerializableSerdeFactory[T <: java.io.Serializable] extends 
SerdeFactory[T] {
-  def getSerde(name: String, config: Config): Serde[T] =
-    new SerializableSerde[T]
-}
-
-class SerializableSerde[T <: java.io.Serializable] extends Serde[T] {
-  def toBytes(obj: T): Array[Byte] = if (obj != null) {
-    val bos = new ByteArrayOutputStream
-    val oos = new ObjectOutputStream(bos)
-
-    try {
-      oos.writeObject(obj)
-    }
-    finally {
-      oos.close()
-    }
-
-    bos.toByteArray
-  } else {
-    null
-  }
-
-  def fromBytes(bytes: Array[Byte]): T = if (bytes != null) {
-    val bis = new ByteArrayInputStream(bytes)
-    val ois = new ObjectInputStream(bis)
-
-    try {
-      ois.readObject.asInstanceOf[T]
-    }
-    finally{
-      ois.close()
-    }
-  } else {
-    null.asInstanceOf[T]
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala 
b/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala
deleted file mode 100644
index c69c402..0000000
--- a/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.apache.samza.config.Config
-
-/**
- * A serializer for strings
- */
-class StringSerdeFactory extends SerdeFactory[String] {
-  def getSerde(name: String, config: Config): Serde[String] =
-    new StringSerde(config.get("encoding", "UTF-8"))
-}
-
-class StringSerde(val encoding: String) extends Serde[String] {
-  // constructor (for Java) that defaults to UTF-8 encoding
-  def this() {
-    this("UTF-8")
-  }
-
-  def toBytes(obj: String): Array[Byte] = if (obj != null) {
-    obj.toString.getBytes(encoding)
-  } else {
-    null
-  }
-
-  def fromBytes(bytes: Array[Byte]): String = if (bytes != null) {
-    new String(bytes, 0, bytes.size, encoding)
-  } else {
-    null
-  }
-}

http://git-wip-us.apache.org/repos/asf/samza/blob/357d6ca7/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala 
b/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala
deleted file mode 100644
index 88d4327..0000000
--- a/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-import java.util.UUID
-
-import org.apache.samza.config.Config
-
-/**
- * A serializer for UUID
- */
-class UUIDSerdeFactory extends SerdeFactory[UUID] {
-  def getSerde(name: String, config: Config): Serde[UUID] = new UUIDSerde
-}
-
-class UUIDSerde() extends Serde[UUID] {
-  def toBytes(obj: UUID): Array[Byte] = if (obj != null) {
-    
ByteBuffer.allocate(16).putLong(obj.getMostSignificantBits).putLong(obj.getLeastSignificantBits).array
-  } else {
-    null
-  }
-
-  def fromBytes(bytes: Array[Byte]): UUID = if (bytes != null) {
-    val buffer = ByteBuffer.wrap(bytes)
-    new UUID(buffer.getLong, buffer.getLong)
-  } else {
-    null
-  }
-}

Reply via email to