Repository: samza
Updated Branches:
  refs/heads/master 38e81c0f9 -> 5fab34e52


SAMZA-936: add double serde


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5fab34e5
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5fab34e5
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5fab34e5

Branch: refs/heads/master
Commit: 5fab34e52982a2fe469106382dbd365fbf6f771f
Parents: 38e81c0
Author: Jon Bringhurst <[email protected]>
Authored: Thu Jun 23 15:32:22 2016 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Thu Jun 23 15:32:22 2016 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     |  4 ++
 .../apache/samza/serializers/DoubleSerde.scala  | 45 ++++++++++++++++++++
 .../main/scala/org/apache/samza/util/Util.scala |  3 +-
 .../samza/serializers/TestDoubleSerde.scala     | 40 +++++++++++++++++
 .../scala/org/apache/samza/util/TestUtil.scala  |  1 +
 5 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/5fab34e5/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html 
b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 77907eb..54c5298 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -775,6 +775,10 @@
                             <dd>Encodes <code>java.lang.String</code> objects 
as UTF-8.</dd>
                             
<dt><code>org.apache.samza.serializers.JsonSerdeFactory</code></dt>
                             <dd>Encodes nested structures of 
<code>java.util.Map</code>, <code>java.util.List</code> etc. as JSON.</dd>
+                            
<dt><code>org.apache.samza.serializers.LongSerdeFactory</code></dt>
+                            <dd>Encodes <code>java.lang.Long</code> as binary 
(8 bytes fixed-length big-endian encoding).</dd>
+                            
<dt><code>org.apache.samza.serializers.DoubleSerdeFactory</code></dt>
+                            <dd>Encodes <code>java.lang.Double</code> as 
binray (8 bytes double-precision float point).</dd>
                             
<dt><code>org.apache.samza.serializers.MetricsSnapshotSerdeFactory</code></dt>
                             <dd>Encodes 
<code>org.apache.samza.metrics.reporter.MetricsSnapshot</code> objects (which 
are
                                 used for <a 
href="../container/metrics.html">reporting metrics</a>) as JSON.</dd>

http://git-wip-us.apache.org/repos/asf/samza/blob/5fab34e5/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala 
b/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala
new file mode 100644
index 0000000..7981d2c
--- /dev/null
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.ByteBuffer
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for doubles
+ */
+class DoubleSerdeFactory extends SerdeFactory[java.lang.Double] {
+  def getSerde(name: String, config: Config): Serde[java.lang.Double] = new 
DoubleSerde
+}
+
+class DoubleSerde extends Serde[java.lang.Double] {
+  def toBytes(obj: java.lang.Double): Array[Byte] = if (obj != null) {
+    ByteBuffer.allocate(8).putDouble(obj.doubleValue()).array
+  } else {
+    null
+  }
+
+  // big-endian by default
+  def fromBytes(bytes: Array[Byte]): java.lang.Double = if (bytes != null) {
+    ByteBuffer.wrap(bytes).getDouble
+  } else {
+    null
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5fab34e5/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index 84166b4..95a5aa0 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -185,7 +185,7 @@ object Util extends Logging {
   /**
    * Generates a coordinator stream name based off of the job name and job id
    * for the jobd. The format is of the stream name will be
-   * __samza_coordinator_&lt;JOBNAME&gt;_&lt;JOBID&gt;.
+   * &#95;&#95;samza_coordinator_&lt;JOBNAME&gt;_&lt;JOBID&gt;.
    */
   def getCoordinatorStreamName(jobName: String, jobId: String) = {
     "__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), 
jobId.replaceAll("_", "-"))
@@ -359,6 +359,7 @@ object Util extends Logging {
       case "long" => classOf[LongSerdeFactory].getCanonicalName
       case "serializable" => 
classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName
       case "string" => classOf[StringSerdeFactory].getCanonicalName
+      case "double" => classOf[DoubleSerdeFactory].getCanonicalName
       case _ => throw new SamzaException("defaultSerdeFactoryFromSerdeName: No 
class defined for serde %s" format serdeName)
     }
     info("use default serde %s for %s" format (serde, serdeName))

http://git-wip-us.apache.org/repos/asf/samza/blob/5fab34e5/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
new file mode 100644
index 0000000..60241b5
--- /dev/null
+++ 
b/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.util.Arrays
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestDoubleSerde {
+  @Test
+  def testDoubleSerde {
+    val serde = new DoubleSerde
+    assertEquals(null, serde.toBytes(null))
+    assertEquals(null, serde.fromBytes(null))
+
+    val fooBar = 9.156013e-002
+    val fooBarBytes = serde.toBytes(fooBar)
+    fooBarBytes.foreach(System.err.println)
+    assertArrayEquals(Array[Byte](63, -73, 112, 124, 19, -9, -82, -93), 
fooBarBytes)
+    assertEquals(fooBar, serde.fromBytes(fooBarBytes))
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/5fab34e5/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala 
b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
index b5c212a..da7c71d 100644
--- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
+++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala
@@ -82,6 +82,7 @@ class TestUtil {
     assertEquals(classOf[LongSerdeFactory].getName, 
defaultSerdeFactoryFromSerdeName("long"))
     
assertEquals(classOf[SerializableSerdeFactory[java.io.Serializable@unchecked]].getName,
 defaultSerdeFactoryFromSerdeName("serializable"))
     assertEquals(classOf[StringSerdeFactory].getName, 
defaultSerdeFactoryFromSerdeName("string"))
+    assertEquals(classOf[DoubleSerdeFactory].getName, 
defaultSerdeFactoryFromSerdeName("double"))
 
     // throw SamzaException if can not find the correct serde
     var throwSamzaException = false

Reply via email to