Repository: samza Updated Branches: refs/heads/master 38e81c0f9 -> 5fab34e52
SAMZA-936: add double serde Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5fab34e5 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5fab34e5 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5fab34e5 Branch: refs/heads/master Commit: 5fab34e52982a2fe469106382dbd365fbf6f771f Parents: 38e81c0 Author: Jon Bringhurst <[email protected]> Authored: Thu Jun 23 15:32:22 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Thu Jun 23 15:32:22 2016 -0700 ---------------------------------------------------------------------- .../versioned/jobs/configuration-table.html | 4 ++ .../apache/samza/serializers/DoubleSerde.scala | 45 ++++++++++++++++++++ .../main/scala/org/apache/samza/util/Util.scala | 3 +- .../samza/serializers/TestDoubleSerde.scala | 40 +++++++++++++++++ .../scala/org/apache/samza/util/TestUtil.scala | 1 + 5 files changed, 92 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/5fab34e5/docs/learn/documentation/versioned/jobs/configuration-table.html ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 77907eb..54c5298 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -775,6 +775,10 @@ <dd>Encodes <code>java.lang.String</code> objects as UTF-8.</dd> <dt><code>org.apache.samza.serializers.JsonSerdeFactory</code></dt> <dd>Encodes nested structures of <code>java.util.Map</code>, <code>java.util.List</code> etc. as JSON.</dd> + <dt><code>org.apache.samza.serializers.LongSerdeFactory</code></dt> + <dd>Encodes <code>java.lang.Long</code> as binary (8 bytes fixed-length big-endian encoding).</dd> + <dt><code>org.apache.samza.serializers.DoubleSerdeFactory</code></dt> + <dd>Encodes <code>java.lang.Double</code> as binray (8 bytes double-precision float point).</dd> <dt><code>org.apache.samza.serializers.MetricsSnapshotSerdeFactory</code></dt> <dd>Encodes <code>org.apache.samza.metrics.reporter.MetricsSnapshot</code> objects (which are used for <a href="../container/metrics.html">reporting metrics</a>) as JSON.</dd> http://git-wip-us.apache.org/repos/asf/samza/blob/5fab34e5/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala new file mode 100644 index 0000000..7981d2c --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers + +import java.nio.ByteBuffer +import org.apache.samza.config.Config + +/** + * A serializer for doubles + */ +class DoubleSerdeFactory extends SerdeFactory[java.lang.Double] { + def getSerde(name: String, config: Config): Serde[java.lang.Double] = new DoubleSerde +} + +class DoubleSerde extends Serde[java.lang.Double] { + def toBytes(obj: java.lang.Double): Array[Byte] = if (obj != null) { + ByteBuffer.allocate(8).putDouble(obj.doubleValue()).array + } else { + null + } + + // big-endian by default + def fromBytes(bytes: Array[Byte]): java.lang.Double = if (bytes != null) { + ByteBuffer.wrap(bytes).getDouble + } else { + null + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/5fab34e5/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index 84166b4..95a5aa0 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -185,7 +185,7 @@ object Util extends Logging { /** * Generates a coordinator stream name based off of the job name and job id * for the jobd. The format is of the stream name will be - * __samza_coordinator_<JOBNAME>_<JOBID>. + * __samza_coordinator_<JOBNAME>_<JOBID>. */ def getCoordinatorStreamName(jobName: String, jobId: String) = { "__samza_coordinator_%s_%s" format (jobName.replaceAll("_", "-"), jobId.replaceAll("_", "-")) @@ -359,6 +359,7 @@ object Util extends Logging { case "long" => classOf[LongSerdeFactory].getCanonicalName case "serializable" => classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName case "string" => classOf[StringSerdeFactory].getCanonicalName + case "double" => classOf[DoubleSerdeFactory].getCanonicalName case _ => throw new SamzaException("defaultSerdeFactoryFromSerdeName: No class defined for serde %s" format serdeName) } info("use default serde %s for %s" format (serde, serdeName)) http://git-wip-us.apache.org/repos/asf/samza/blob/5fab34e5/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala new file mode 100644 index 0000000..60241b5 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers + +import java.util.Arrays + +import org.junit.Assert._ +import org.junit.Test + +class TestDoubleSerde { + @Test + def testDoubleSerde { + val serde = new DoubleSerde + assertEquals(null, serde.toBytes(null)) + assertEquals(null, serde.fromBytes(null)) + + val fooBar = 9.156013e-002 + val fooBarBytes = serde.toBytes(fooBar) + fooBarBytes.foreach(System.err.println) + assertArrayEquals(Array[Byte](63, -73, 112, 124, 19, -9, -82, -93), fooBarBytes) + assertEquals(fooBar, serde.fromBytes(fooBarBytes)) + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/5fab34e5/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala index b5c212a..da7c71d 100644 --- a/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala +++ b/samza-core/src/test/scala/org/apache/samza/util/TestUtil.scala @@ -82,6 +82,7 @@ class TestUtil { assertEquals(classOf[LongSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("long")) assertEquals(classOf[SerializableSerdeFactory[java.io.Serializable@unchecked]].getName, defaultSerdeFactoryFromSerdeName("serializable")) assertEquals(classOf[StringSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("string")) + assertEquals(classOf[DoubleSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("double")) // throw SamzaException if can not find the correct serde var throwSamzaException = false
