Repository: samza Updated Branches: refs/heads/master 23fb2e1c0 -> fdb90e7e7
SAMZA-660: Added default serdes for changelog streams Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fdb90e7e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fdb90e7e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fdb90e7e Branch: refs/heads/master Commit: fdb90e7e77993cd26b2c5c0bcbaa48c37e0e6e6f Parents: 23fb2e1 Author: Tommy Becker <[email protected]> Authored: Wed Apr 29 13:36:49 2015 -0700 Committer: Yan Fang <[email protected]> Committed: Wed Apr 29 13:36:49 2015 -0700 ---------------------------------------------------------------------- .../apache/samza/container/SamzaContainer.scala | 34 +++++++++----------- .../samza/container/TestSamzaContainer.scala | 18 +++++------ 2 files changed, 25 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/fdb90e7e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 56819e0..ac4793a 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -113,23 +113,21 @@ object SamzaContainer extends Logging { } /** - * A helper function which returns system's default serde according to the + * A helper function which returns system's default serde factory class according to the * serde name. If not found, throw exception. */ - def defaultSerdesFromSerdeName(serdeName: String, exceptionSystemName: String, config: Config) = { + def defaultSerdeFactoryFromSerdeName(serdeName: String) = { info("looking for default serdes") - def getSerde(serdeFactory: String) = { - Util.getObj[SerdeFactory[Object]](serdeFactory).getSerde(serdeName, config) - } + val serde = serdeName match { - case "byte" => getSerde(classOf[ByteSerdeFactory].getCanonicalName) - case "bytebuffer" => getSerde(classOf[ByteBufferSerdeFactory].getCanonicalName) - case "integer" => getSerde(classOf[IntegerSerdeFactory].getCanonicalName) - case "json" => getSerde(classOf[JsonSerdeFactory].getCanonicalName) - case "long" => getSerde(classOf[LongSerdeFactory].getCanonicalName) - case "serializable" => getSerde(classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName) - case "string" => getSerde(classOf[StringSerdeFactory].getCanonicalName) - case _ => throw new SamzaException("Serde %s for system %s does not exist in configuration." format (serdeName, exceptionSystemName)) + case "byte" => classOf[ByteSerdeFactory].getCanonicalName + case "bytebuffer" => classOf[ByteBufferSerdeFactory].getCanonicalName + case "integer" => classOf[IntegerSerdeFactory].getCanonicalName + case "json" => classOf[JsonSerdeFactory].getCanonicalName + case "long" => classOf[LongSerdeFactory].getCanonicalName + case "serializable" => classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName + case "string" => classOf[StringSerdeFactory].getCanonicalName + case _ => throw new SamzaException("No class defined for serde %s" format serdeName) } info("use default serde %s for %s" format (serde, serdeName)) serde @@ -233,7 +231,7 @@ object SamzaContainer extends Logging { val serdes = serdeNames.map(serdeName => { val serdeClassName = config .getSerdeClass(serdeName) - .getOrElse(throw new SamzaException("No class defined for serde: %s." format serdeName)) + .getOrElse(defaultSerdeFactoryFromSerdeName(serdeName)) val serde = Util.getObj[SerdeFactory[Object]](serdeClassName) .getSerde(serdeName, config) @@ -251,7 +249,7 @@ object SamzaContainer extends Logging { .filter(getSerdeName(_).isDefined) .map(systemName => { val serdeName = getSerdeName(systemName).get - val serde = serdes.getOrElse(serdeName, defaultSerdesFromSerdeName(serdeName, systemName, config)) + val serde = serdes.getOrElse(serdeName, throw new SamzaException("No class defined for serde: %s." format serdeName)) (systemName, serde) }).toMap } @@ -264,7 +262,7 @@ object SamzaContainer extends Logging { .filter(systemStream => getSerdeName(systemStream).isDefined) .map(systemStream => { val serdeName = getSerdeName(systemStream).get - val serde = serdes.getOrElse(serdeName, defaultSerdesFromSerdeName(serdeName, systemStream.toString, config)) + val serde = serdes.getOrElse(serdeName, throw new SamzaException("No class defined for serde: %s." format serdeName)) (systemStream, serde) }).toMap } @@ -454,11 +452,11 @@ object SamzaContainer extends Logging { null } val keySerde = config.getStorageKeySerde(storeName) match { - case Some(keySerde) => serdes(keySerde) + case Some(keySerde) => serdes.getOrElse(keySerde, throw new SamzaException("No class defined for serde: %s." format keySerde)) case _ => null } val msgSerde = config.getStorageMsgSerde(storeName) match { - case Some(msgSerde) => serdes(msgSerde) + case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new SamzaException("No class defined for serde: %s." format msgSerde)) case _ => null } val storePartitionDir = TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName) http://git-wip-us.apache.org/repos/asf/samza/blob/fdb90e7e/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala index 58d7fe8..f1e9d0e 100644 --- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala +++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala @@ -166,7 +166,7 @@ class TestSamzaContainer extends AssertionsForJUnit { SamzaContainer.safeMain(() => null, exceptionHandler) } catch { case _: Exception => - // Expect some random exception from SamzaContainer because we haven't + // Expect some random exception from SamzaContainer because we haven't // set any environment variables for container ID, etc. } assertFalse(caughtException) @@ -179,20 +179,20 @@ class TestSamzaContainer extends AssertionsForJUnit { } @Test - def testDefaultSerdesFromSerdeName { + def testDefaultSerdeFactoryFromSerdeName { import SamzaContainer._ val config = new MapConfig - assertTrue(defaultSerdesFromSerdeName("byte", "testSystemException", config).isInstanceOf[ByteSerde]) - assertTrue(defaultSerdesFromSerdeName("integer", "testSystemException", config).isInstanceOf[IntegerSerde]) - assertTrue(defaultSerdesFromSerdeName("json", "testSystemException", config).isInstanceOf[JsonSerde[Object]]) - assertTrue(defaultSerdesFromSerdeName("long", "testSystemException", config).isInstanceOf[LongSerde]) - assertTrue(defaultSerdesFromSerdeName("serializable", "testSystemException", config).isInstanceOf[SerializableSerde[java.io.Serializable @unchecked]]) - assertTrue(defaultSerdesFromSerdeName("string", "testSystemException", config).isInstanceOf[StringSerde]) + assertEquals(classOf[ByteSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("byte")) + assertEquals(classOf[IntegerSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("integer")) + assertEquals(classOf[JsonSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("json")) + assertEquals(classOf[LongSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("long")) + assertEquals(classOf[SerializableSerdeFactory[java.io.Serializable@unchecked]].getName, defaultSerdeFactoryFromSerdeName("serializable")) + assertEquals(classOf[StringSerdeFactory].getName, defaultSerdeFactoryFromSerdeName("string")) // throw SamzaException if can not find the correct serde var throwSamzaException = false try { - defaultSerdesFromSerdeName("otherName", "testSystemException", config) + defaultSerdeFactoryFromSerdeName("otherName") } catch { case e: SamzaException => throwSamzaException = true case _: Exception =>
