Repository: incubator-samza Updated Branches: refs/heads/master 2283fd236 -> 0da1090f2
SAMZA-473; remove samza-serializers package Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/0da1090f Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/0da1090f Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/0da1090f Branch: refs/heads/master Commit: 0da1090f24d86a274f26ad6e9e2ee76c78b77427 Parents: 2283fd2 Author: Chris Riccomini <[email protected]> Authored: Fri Nov 21 14:33:59 2014 -0800 Committer: Chris Riccomini <[email protected]> Committed: Fri Nov 21 14:33:59 2014 -0800 ---------------------------------------------------------------------- build.gradle | 13 ------ .../versioned/container/serialization.md | 2 +- .../apache/samza/serializers/JsonSerde.scala | 38 ++++++++++++++++ .../serializers/MetricsSnapshotSerde.scala | 42 +++++++++++++++++ .../samza/serializers/TestJsonSerde.scala | 35 +++++++++++++++ .../serializers/TestMetricsSnapshotSerde.scala | 47 ++++++++++++++++++++ .../apache/samza/serializers/JsonSerde.scala | 38 ---------------- .../serializers/MetricsSnapshotSerde.scala | 42 ----------------- .../samza/serializers/TestJsonSerde.scala | 35 --------------- .../serializers/TestMetricsSnapshotSerde.scala | 47 -------------------- settings.gradle | 1 - 11 files changed, 163 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0da1090f/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 772d162..a9c4218 100644 --- a/build.gradle +++ b/build.gradle @@ -149,7 +149,6 @@ project(":samza-kafka_$scalaVersion") { dependencies { compile project(':samza-api') compile project(":samza-core_$scalaVersion") - compile project(":samza-serializers_$scalaVersion") compile "org.scala-lang:scala-library:$scalaLibVersion" compile "com.101tec:zkclient:$zkClientVersion" compile "org.apache.zookeeper:zookeeper:$zookeeperVersion" @@ -180,18 +179,6 @@ project(':samza-log4j') { } } -project(":samza-serializers_$scalaVersion") { - apply plugin: 'scala' - - dependencies { - compile project(':samza-api') - compile project(":samza-core_$scalaVersion") - compile "org.scala-lang:scala-library:$scalaLibVersion" - compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion" - testCompile "junit:junit:$junitVersion" - } -} - project(":samza-yarn_$scalaVersion") { apply plugin: 'scala' apply plugin: 'lesscss' http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0da1090f/docs/learn/documentation/versioned/container/serialization.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/container/serialization.md b/docs/learn/documentation/versioned/container/serialization.md index ff7d8b9..1f07a7c 100644 --- a/docs/learn/documentation/versioned/container/serialization.md +++ b/docs/learn/documentation/versioned/container/serialization.md @@ -53,7 +53,7 @@ stores.LastPageViewPerUser.key.serde=integer stores.LastPageViewPerUser.msg.serde=json {% endhighlight %} -Each serde is defined with a factory class. Samza comes with several builtin serdes for UTF-8 strings, binary-encoded integers, JSON (requires the samza-serializers dependency) and more. You can also create your own serializer by implementing the [SerdeFactory](../api/javadocs/org/apache/samza/serializers/SerdeFactory.html) interface. +Each serde is defined with a factory class. Samza comes with several builtin serdes for UTF-8 strings, binary-encoded integers, JSON and more. You can also create your own serializer by implementing the [SerdeFactory](../api/javadocs/org/apache/samza/serializers/SerdeFactory.html) interface. The name you give to a serde (such as "json" and "integer" in the example above) is only for convenience in your job configuration; you can choose whatever name you like. For each stream and each state store, you can use the serde name to declare how messages should be serialized and deserialized. http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0da1090f/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala new file mode 100644 index 0000000..744eec0 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers +import org.codehaus.jackson.map.ObjectMapper +import java.nio.ByteBuffer +import org.apache.samza.config.Config + +class JsonSerde extends Serde[Object] { + val objectMapper = new ObjectMapper + + def toBytes(obj: Object) = objectMapper + .writeValueAsString(obj) + .getBytes("UTF-8") + + def fromBytes(bytes: Array[Byte]) = objectMapper + .readValue(bytes, classOf[Object]) +} + +class JsonSerdeFactory extends SerdeFactory[Object] { + def getSerde(name: String, config: Config) = new JsonSerde +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0da1090f/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala new file mode 100644 index 0000000..455dd34 --- /dev/null +++ b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers +import org.apache.samza.config.Config +import org.codehaus.jackson.map.ObjectMapper +import java.util.Map +import java.nio.ByteBuffer +import org.apache.samza.metrics.reporter.MetricsSnapshot + +class MetricsSnapshotSerde extends Serde[MetricsSnapshot] { + val jsonMapper = new ObjectMapper + + def toBytes(obj: MetricsSnapshot) = jsonMapper + .writeValueAsString(obj.getAsMap) + .getBytes("UTF-8") + + def fromBytes(bytes: Array[Byte]) = { + val metricMap = jsonMapper.readValue(bytes, classOf[java.util.Map[String, java.util.Map[String, Object]]]) + MetricsSnapshot.fromMap(metricMap) + } +} + +class MetricsSnapshotSerdeFactory extends SerdeFactory[MetricsSnapshot] { + def getSerde(name: String, config: Config) = new MetricsSnapshotSerde +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0da1090f/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala new file mode 100644 index 0000000..6046071 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers + +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.JavaConversions._ + +class TestJsonSerde { + @Test + def testJsonSerdeShouldWork { + val serde = new JsonSerde + val obj = new java.util.HashMap[String, Object](Map[String, Object]("hi" -> "bye", "why" -> new java.lang.Integer(2))) + val bytes = serde.toBytes(obj) + assertEquals(obj, serde.fromBytes(bytes)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0da1090f/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala new file mode 100644 index 0000000..5bc0be6 --- /dev/null +++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.serializers + +import java.util.HashMap +import java.util.Map + +import org.apache.samza.metrics.reporter.MetricsSnapshot +import org.apache.samza.metrics.reporter.MetricsHeader +import org.apache.samza.metrics.reporter.Metrics +import org.junit.Assert._ +import org.junit.Ignore +import org.junit.Test + +class TestMetricsSnapshotSerde { + @Ignore + @Test + def testMetricsSerdeShouldSerializeAndDeserializeAMetric { + val header = new MetricsHeader("test", "testjobid", "task", "test", "version", "samzaversion", "host", 1L, 2L) + val metricsMap = new HashMap[String, Object]() + metricsMap.put("test2", "foo") + val metricsGroupMap = new HashMap[String, Map[String, Object]]() + metricsGroupMap.put("test", metricsMap) + val metrics = Metrics.fromMap(metricsGroupMap) + val snapshot = new MetricsSnapshot(header, metrics) + val serde = new MetricsSnapshotSerde() + val bytes = serde.toBytes(snapshot) + assertTrue(serde.fromBytes(bytes).equals(metrics)) + } +} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0da1090f/samza-serializers/src/main/scala/org/apache/samza/serializers/JsonSerde.scala ---------------------------------------------------------------------- diff --git a/samza-serializers/src/main/scala/org/apache/samza/serializers/JsonSerde.scala b/samza-serializers/src/main/scala/org/apache/samza/serializers/JsonSerde.scala deleted file mode 100644 index 744eec0..0000000 --- a/samza-serializers/src/main/scala/org/apache/samza/serializers/JsonSerde.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers -import org.codehaus.jackson.map.ObjectMapper -import java.nio.ByteBuffer -import org.apache.samza.config.Config - -class JsonSerde extends Serde[Object] { - val objectMapper = new ObjectMapper - - def toBytes(obj: Object) = objectMapper - .writeValueAsString(obj) - .getBytes("UTF-8") - - def fromBytes(bytes: Array[Byte]) = objectMapper - .readValue(bytes, classOf[Object]) -} - -class JsonSerdeFactory extends SerdeFactory[Object] { - def getSerde(name: String, config: Config) = new JsonSerde -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0da1090f/samza-serializers/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala ---------------------------------------------------------------------- diff --git a/samza-serializers/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala b/samza-serializers/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala deleted file mode 100644 index 455dd34..0000000 --- a/samza-serializers/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers -import org.apache.samza.config.Config -import org.codehaus.jackson.map.ObjectMapper -import java.util.Map -import java.nio.ByteBuffer -import org.apache.samza.metrics.reporter.MetricsSnapshot - -class MetricsSnapshotSerde extends Serde[MetricsSnapshot] { - val jsonMapper = new ObjectMapper - - def toBytes(obj: MetricsSnapshot) = jsonMapper - .writeValueAsString(obj.getAsMap) - .getBytes("UTF-8") - - def fromBytes(bytes: Array[Byte]) = { - val metricMap = jsonMapper.readValue(bytes, classOf[java.util.Map[String, java.util.Map[String, Object]]]) - MetricsSnapshot.fromMap(metricMap) - } -} - -class MetricsSnapshotSerdeFactory extends SerdeFactory[MetricsSnapshot] { - def getSerde(name: String, config: Config) = new MetricsSnapshotSerde -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0da1090f/samza-serializers/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala ---------------------------------------------------------------------- diff --git a/samza-serializers/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala b/samza-serializers/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala deleted file mode 100644 index 6046071..0000000 --- a/samza-serializers/src/test/scala/org/apache/samza/serializers/TestJsonSerde.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import org.junit.Assert._ -import org.junit.Test - -import scala.collection.JavaConversions._ - -class TestJsonSerde { - @Test - def testJsonSerdeShouldWork { - val serde = new JsonSerde - val obj = new java.util.HashMap[String, Object](Map[String, Object]("hi" -> "bye", "why" -> new java.lang.Integer(2))) - val bytes = serde.toBytes(obj) - assertEquals(obj, serde.fromBytes(bytes)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0da1090f/samza-serializers/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala ---------------------------------------------------------------------- diff --git a/samza-serializers/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala b/samza-serializers/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala deleted file mode 100644 index 5bc0be6..0000000 --- a/samza-serializers/src/test/scala/org/apache/samza/serializers/TestMetricsSnapshotSerde.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.serializers - -import java.util.HashMap -import java.util.Map - -import org.apache.samza.metrics.reporter.MetricsSnapshot -import org.apache.samza.metrics.reporter.MetricsHeader -import org.apache.samza.metrics.reporter.Metrics -import org.junit.Assert._ -import org.junit.Ignore -import org.junit.Test - -class TestMetricsSnapshotSerde { - @Ignore - @Test - def testMetricsSerdeShouldSerializeAndDeserializeAMetric { - val header = new MetricsHeader("test", "testjobid", "task", "test", "version", "samzaversion", "host", 1L, 2L) - val metricsMap = new HashMap[String, Object]() - metricsMap.put("test2", "foo") - val metricsGroupMap = new HashMap[String, Map[String, Object]]() - metricsGroupMap.put("test", metricsMap) - val metrics = Metrics.fromMap(metricsGroupMap) - val snapshot = new MetricsSnapshot(header, metrics) - val serde = new MetricsSnapshotSerde() - val bytes = serde.toBytes(snapshot) - assertTrue(serde.fromBytes(bytes).equals(metrics)) - } -} http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/0da1090f/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 216c5ee..3a01fd6 100644 --- a/settings.gradle +++ b/settings.gradle @@ -25,7 +25,6 @@ include \ 'samza-kv-leveldb', 'samza-kv-rocksdb', 'samza-log4j', - 'samza-serializers', 'samza-shell', 'samza-yarn', 'samza-test'
