Repository: samza
Updated Branches:
  refs/heads/master 23fb2e1c0 -> fdb90e7e7


SAMZA-660: Added default serdes for changelog streams


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/fdb90e7e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/fdb90e7e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/fdb90e7e

Branch: refs/heads/master
Commit: fdb90e7e77993cd26b2c5c0bcbaa48c37e0e6e6f
Parents: 23fb2e1
Author: Tommy Becker <[email protected]>
Authored: Wed Apr 29 13:36:49 2015 -0700
Committer: Yan Fang <[email protected]>
Committed: Wed Apr 29 13:36:49 2015 -0700

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala | 34 +++++++++-----------
 .../samza/container/TestSamzaContainer.scala    | 18 +++++------
 2 files changed, 25 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/fdb90e7e/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 56819e0..ac4793a 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -113,23 +113,21 @@ object SamzaContainer extends Logging {
   }
 
   /**
-   * A helper function which returns system's default serde according to the
+   * A helper function which returns system's default serde factory class 
according to the
    * serde name. If not found, throw exception.
    */
-  def defaultSerdesFromSerdeName(serdeName: String, exceptionSystemName: 
String, config: Config) = {
+  def defaultSerdeFactoryFromSerdeName(serdeName: String) = {
     info("looking for default serdes")
-    def getSerde(serdeFactory: String) = {
-      Util.getObj[SerdeFactory[Object]](serdeFactory).getSerde(serdeName, 
config)
-    }
+
     val serde = serdeName match {
-      case "byte" => getSerde(classOf[ByteSerdeFactory].getCanonicalName)
-      case "bytebuffer" => 
getSerde(classOf[ByteBufferSerdeFactory].getCanonicalName)
-      case "integer" => getSerde(classOf[IntegerSerdeFactory].getCanonicalName)
-      case "json" => getSerde(classOf[JsonSerdeFactory].getCanonicalName)
-      case "long" => getSerde(classOf[LongSerdeFactory].getCanonicalName)
-      case "serializable" => 
getSerde(classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName)
-      case "string" => getSerde(classOf[StringSerdeFactory].getCanonicalName)
-      case _ => throw new SamzaException("Serde %s for system %s does not 
exist in configuration." format (serdeName, exceptionSystemName))
+      case "byte" => classOf[ByteSerdeFactory].getCanonicalName
+      case "bytebuffer" => classOf[ByteBufferSerdeFactory].getCanonicalName
+      case "integer" => classOf[IntegerSerdeFactory].getCanonicalName
+      case "json" => classOf[JsonSerdeFactory].getCanonicalName
+      case "long" => classOf[LongSerdeFactory].getCanonicalName
+      case "serializable" => 
classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName
+      case "string" => classOf[StringSerdeFactory].getCanonicalName
+      case _ => throw new SamzaException("No class defined for serde %s" 
format serdeName)
     }
     info("use default serde %s for %s" format (serde, serdeName))
     serde
@@ -233,7 +231,7 @@ object SamzaContainer extends Logging {
     val serdes = serdeNames.map(serdeName => {
       val serdeClassName = config
         .getSerdeClass(serdeName)
-        .getOrElse(throw new SamzaException("No class defined for serde: %s." 
format serdeName))
+        .getOrElse(defaultSerdeFactoryFromSerdeName(serdeName))
 
       val serde = Util.getObj[SerdeFactory[Object]](serdeClassName)
         .getSerde(serdeName, config)
@@ -251,7 +249,7 @@ object SamzaContainer extends Logging {
         .filter(getSerdeName(_).isDefined)
         .map(systemName => {
           val serdeName = getSerdeName(systemName).get
-          val serde = serdes.getOrElse(serdeName, 
defaultSerdesFromSerdeName(serdeName, systemName, config))
+          val serde = serdes.getOrElse(serdeName, throw new SamzaException("No 
class defined for serde: %s." format serdeName))
           (systemName, serde)
         }).toMap
     }
@@ -264,7 +262,7 @@ object SamzaContainer extends Logging {
         .filter(systemStream => getSerdeName(systemStream).isDefined)
         .map(systemStream => {
           val serdeName = getSerdeName(systemStream).get
-          val serde = serdes.getOrElse(serdeName, 
defaultSerdesFromSerdeName(serdeName, systemStream.toString, config))
+          val serde = serdes.getOrElse(serdeName, throw new SamzaException("No 
class defined for serde: %s." format serdeName))
           (systemStream, serde)
         }).toMap
     }
@@ -454,11 +452,11 @@ object SamzaContainer extends Logging {
               null
             }
             val keySerde = config.getStorageKeySerde(storeName) match {
-              case Some(keySerde) => serdes(keySerde)
+              case Some(keySerde) => serdes.getOrElse(keySerde, throw new 
SamzaException("No class defined for serde: %s." format keySerde))
               case _ => null
             }
             val msgSerde = config.getStorageMsgSerde(storeName) match {
-              case Some(msgSerde) => serdes(msgSerde)
+              case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new 
SamzaException("No class defined for serde: %s." format msgSerde))
               case _ => null
             }
             val storePartitionDir = 
TaskStorageManager.getStorePartitionDir(storeBaseDir, storeName, taskName)

http://git-wip-us.apache.org/repos/asf/samza/blob/fdb90e7e/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 58d7fe8..f1e9d0e 100644
--- 
a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ 
b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -166,7 +166,7 @@ class TestSamzaContainer extends AssertionsForJUnit {
       SamzaContainer.safeMain(() => null, exceptionHandler)
     } catch {
       case _: Exception =>
-      // Expect some random exception from SamzaContainer because we haven't 
+      // Expect some random exception from SamzaContainer because we haven't
       // set any environment variables for container ID, etc.
     }
     assertFalse(caughtException)
@@ -179,20 +179,20 @@ class TestSamzaContainer extends AssertionsForJUnit {
   }
 
   @Test
-  def testDefaultSerdesFromSerdeName {
+  def testDefaultSerdeFactoryFromSerdeName {
     import SamzaContainer._
     val config = new MapConfig
-    assertTrue(defaultSerdesFromSerdeName("byte", "testSystemException", 
config).isInstanceOf[ByteSerde])
-    assertTrue(defaultSerdesFromSerdeName("integer", "testSystemException", 
config).isInstanceOf[IntegerSerde])
-    assertTrue(defaultSerdesFromSerdeName("json", "testSystemException", 
config).isInstanceOf[JsonSerde[Object]])
-    assertTrue(defaultSerdesFromSerdeName("long", "testSystemException", 
config).isInstanceOf[LongSerde])
-    assertTrue(defaultSerdesFromSerdeName("serializable", 
"testSystemException", 
config).isInstanceOf[SerializableSerde[java.io.Serializable @unchecked]])
-    assertTrue(defaultSerdesFromSerdeName("string", "testSystemException", 
config).isInstanceOf[StringSerde])
+    assertEquals(classOf[ByteSerdeFactory].getName, 
defaultSerdeFactoryFromSerdeName("byte"))
+    assertEquals(classOf[IntegerSerdeFactory].getName, 
defaultSerdeFactoryFromSerdeName("integer"))
+    assertEquals(classOf[JsonSerdeFactory].getName, 
defaultSerdeFactoryFromSerdeName("json"))
+    assertEquals(classOf[LongSerdeFactory].getName, 
defaultSerdeFactoryFromSerdeName("long"))
+    
assertEquals(classOf[SerializableSerdeFactory[java.io.Serializable@unchecked]].getName,
 defaultSerdeFactoryFromSerdeName("serializable"))
+    assertEquals(classOf[StringSerdeFactory].getName, 
defaultSerdeFactoryFromSerdeName("string"))
 
     // throw SamzaException if can not find the correct serde
     var throwSamzaException = false
     try {
-      defaultSerdesFromSerdeName("otherName", "testSystemException", config)
+      defaultSerdeFactoryFromSerdeName("otherName")
     } catch {
       case e: SamzaException => throwSamzaException = true
       case _: Exception =>

Reply via email to