Repository: samza
Updated Branches:
  refs/heads/master f4bd84bbb -> 312c1b17b


SAMZA-969: Does not allow empty serdes for System and SystemStreams


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/312c1b17
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/312c1b17
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/312c1b17

Branch: refs/heads/master
Commit: 312c1b17b047f778cb2915b059f3d7f455da58fe
Parents: f4bd84b
Author: Boris Shkolik <[email protected]>
Authored: Fri Jun 17 11:52:07 2016 -0700
Committer: Yi Pan (Data Infrastructure) <[email protected]>
Committed: Fri Jun 17 11:52:07 2016 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/ScalaMapConfig.scala    |  8 ++++++++
 .../org/apache/samza/config/StreamConfig.scala      |  4 ++--
 .../org/apache/samza/config/SystemConfig.scala      |  4 ++--
 .../org/apache/samza/container/SamzaContainer.scala | 16 +++++++---------
 .../src/main/scala/org/apache/samza/util/Util.scala |  2 +-
 5 files changed, 20 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/312c1b17/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala
index 5fb1f52..f648ced 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala
@@ -25,6 +25,14 @@ class ScalaMapConfig(config: Config) extends 
MapConfig(config) {
 
   def getOption(k: String): Option[String] = if (containsKey(k)) 
Some(config.get(k)) else None
 
+  def getNonEmptyOption(k: String): Option[String] = {
+    getOption(k) match {
+      case Some(v: String) if (!v.isEmpty) => Some(v)
+      case _ => None
+    }
+  }
+
+
   def getExcept(k: String, msg: String = null): String = 
     getOption(k) match {
       case Some(s) => s

http://git-wip-us.apache.org/repos/asf/samza/blob/312c1b17/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
index e172589..0ccc7df 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala
@@ -36,10 +36,10 @@ object StreamConfig {
 
 class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging 
{
   def getStreamMsgSerde(systemStream: SystemStream) =
-    getOption(StreamConfig.MSG_SERDE format (systemStream.getSystem, 
systemStream.getStream))
+    getNonEmptyOption(StreamConfig.MSG_SERDE format (systemStream.getSystem, 
systemStream.getStream))
 
   def getStreamKeySerde(systemStream: SystemStream) =
-    getOption(StreamConfig.KEY_SERDE format (systemStream.getSystem, 
systemStream.getStream))
+    getNonEmptyOption(StreamConfig.KEY_SERDE format (systemStream.getSystem, 
systemStream.getStream))
 
   def getResetOffsetMap(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)

http://git-wip-us.apache.org/repos/asf/samza/blob/312c1b17/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
index bf974df..3295394 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala
@@ -36,9 +36,9 @@ object SystemConfig {
 class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging 
{
   def getSystemFactory(name: String) = getOption(SystemConfig.SYSTEM_FACTORY 
format name)
 
-  def getSystemKeySerde(name: String) = getOption(SystemConfig.KEY_SERDE 
format name)
+  def getSystemKeySerde(name: String) = 
getNonEmptyOption(SystemConfig.KEY_SERDE format name)
 
-  def getSystemMsgSerde(name: String) = getOption(SystemConfig.MSG_SERDE 
format name)
+  def getSystemMsgSerde(name: String) = 
getNonEmptyOption(SystemConfig.MSG_SERDE format name)
 
   def getDefaultSystemOffset(systemName: String) = 
getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName))
 

http://git-wip-us.apache.org/repos/asf/samza/blob/312c1b17/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 086531e..2e8a500 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -155,7 +155,7 @@ object SamzaContainer extends Logging {
 
     val serdeStreams = systemNames.foldLeft(Set[SystemStream]())(_ ++ 
config.getSerdeStreams(_))
 
-    debug("Got serde streams: %s" format serdeStreams)
+    info("Got serde streams: %s" format serdeStreams)
 
     val serdeNames = config.getSerdeNames
 
@@ -230,12 +230,10 @@ object SamzaContainer extends Logging {
      */
     val buildSystemSerdeMap = (getSerdeName: (String) => Option[String]) => {
       systemNames
-        .filter( sn => {
-          val serde = getSerdeName(sn)
-          serde.isDefined && !serde.get.equals("")
-        }).map(systemName => {
+        .filter(systemName => getSerdeName(systemName).isDefined)
+              .map(systemName => {
           val serdeName = getSerdeName(systemName).get
-          val serde = serdes.getOrElse(serdeName, throw new SamzaException("No 
class defined for serde: %s." format serdeName))
+          val serde = serdes.getOrElse(serdeName, throw new 
SamzaException("buildSystemSerdeMap: No class defined for serde: %s." format 
serdeName))
           (systemName, serde)
         }).toMap
     }
@@ -248,7 +246,7 @@ object SamzaContainer extends Logging {
         .filter(systemStream => getSerdeName(systemStream).isDefined)
         .map(systemStream => {
           val serdeName = getSerdeName(systemStream).get
-          val serde = serdes.getOrElse(serdeName, throw new SamzaException("No 
class defined for serde: %s." format serdeName))
+          val serde = serdes.getOrElse(serdeName, throw new 
SamzaException("buildSystemStreamSerdeMap: No class defined for serde: %s." 
format serdeName))
           (systemStream, serde)
         }).toMap
     }
@@ -459,11 +457,11 @@ object SamzaContainer extends Logging {
               null
             }
             val keySerde = config.getStorageKeySerde(storeName) match {
-              case Some(keySerde) => serdes.getOrElse(keySerde, throw new 
SamzaException("No class defined for serde: %s." format keySerde))
+              case Some(keySerde) => serdes.getOrElse(keySerde, throw new 
SamzaException("StorageKeySerde: No class defined for serde: %s." format 
keySerde))
               case _ => null
             }
             val msgSerde = config.getStorageMsgSerde(storeName) match {
-              case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new 
SamzaException("No class defined for serde: %s." format msgSerde))
+              case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new 
SamzaException("StorageMsgSerde: No class defined for serde: %s." format 
msgSerde))
               case _ => null
             }
             val storeBaseDir = if(changeLogSystemStreamPartition != null) {

http://git-wip-us.apache.org/repos/asf/samza/blob/312c1b17/samza-core/src/main/scala/org/apache/samza/util/Util.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala 
b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
index fc3d085..c77d929 100644
--- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala
+++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala
@@ -359,7 +359,7 @@ object Util extends Logging {
       case "long" => classOf[LongSerdeFactory].getCanonicalName
       case "serializable" => 
classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName
       case "string" => classOf[StringSerdeFactory].getCanonicalName
-      case _ => throw new SamzaException("No class defined for serde %s" 
format serdeName)
+      case _ => throw new SamzaException("defaultSerdeFactoryFromSerdeName: No 
class defined for serde %s" format serdeName)
     }
     info("use default serde %s for %s" format (serde, serdeName))
     serde

Reply via email to