Repository: samza Updated Branches: refs/heads/master f4bd84bbb -> 312c1b17b
SAMZA-969: Does not allow empty serdes for System and SystemStreams Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/312c1b17 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/312c1b17 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/312c1b17 Branch: refs/heads/master Commit: 312c1b17b047f778cb2915b059f3d7f455da58fe Parents: f4bd84b Author: Boris Shkolik <[email protected]> Authored: Fri Jun 17 11:52:07 2016 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Fri Jun 17 11:52:07 2016 -0700 ---------------------------------------------------------------------- .../org/apache/samza/config/ScalaMapConfig.scala | 8 ++++++++ .../org/apache/samza/config/StreamConfig.scala | 4 ++-- .../org/apache/samza/config/SystemConfig.scala | 4 ++-- .../org/apache/samza/container/SamzaContainer.scala | 16 +++++++--------- .../src/main/scala/org/apache/samza/util/Util.scala | 2 +- 5 files changed, 20 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/312c1b17/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala index 5fb1f52..f648ced 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/ScalaMapConfig.scala @@ -25,6 +25,14 @@ class ScalaMapConfig(config: Config) extends MapConfig(config) { def getOption(k: String): Option[String] = if (containsKey(k)) Some(config.get(k)) else None + def getNonEmptyOption(k: String): Option[String] = { + getOption(k) match { + case Some(v: String) if (!v.isEmpty) => Some(v) + case _ => None + } + } + + def getExcept(k: String, msg: String = null): String = getOption(k) match { case Some(s) => s http://git-wip-us.apache.org/repos/asf/samza/blob/312c1b17/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala index e172589..0ccc7df 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StreamConfig.scala @@ -36,10 +36,10 @@ object StreamConfig { class StreamConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getStreamMsgSerde(systemStream: SystemStream) = - getOption(StreamConfig.MSG_SERDE format (systemStream.getSystem, systemStream.getStream)) + getNonEmptyOption(StreamConfig.MSG_SERDE format (systemStream.getSystem, systemStream.getStream)) def getStreamKeySerde(systemStream: SystemStream) = - getOption(StreamConfig.KEY_SERDE format (systemStream.getSystem, systemStream.getStream)) + getNonEmptyOption(StreamConfig.KEY_SERDE format (systemStream.getSystem, systemStream.getStream)) def getResetOffsetMap(systemName: String) = { val subConf = config.subset("systems.%s.streams." format systemName, true) http://git-wip-us.apache.org/repos/asf/samza/blob/312c1b17/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala index bf974df..3295394 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/SystemConfig.scala @@ -36,9 +36,9 @@ object SystemConfig { class SystemConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getSystemFactory(name: String) = getOption(SystemConfig.SYSTEM_FACTORY format name) - def getSystemKeySerde(name: String) = getOption(SystemConfig.KEY_SERDE format name) + def getSystemKeySerde(name: String) = getNonEmptyOption(SystemConfig.KEY_SERDE format name) - def getSystemMsgSerde(name: String) = getOption(SystemConfig.MSG_SERDE format name) + def getSystemMsgSerde(name: String) = getNonEmptyOption(SystemConfig.MSG_SERDE format name) def getDefaultSystemOffset(systemName: String) = getOption(SystemConfig.CONSUMER_OFFSET_DEFAULT format (systemName)) http://git-wip-us.apache.org/repos/asf/samza/blob/312c1b17/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 086531e..2e8a500 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -155,7 +155,7 @@ object SamzaContainer extends Logging { val serdeStreams = systemNames.foldLeft(Set[SystemStream]())(_ ++ config.getSerdeStreams(_)) - debug("Got serde streams: %s" format serdeStreams) + info("Got serde streams: %s" format serdeStreams) val serdeNames = config.getSerdeNames @@ -230,12 +230,10 @@ object SamzaContainer extends Logging { */ val buildSystemSerdeMap = (getSerdeName: (String) => Option[String]) => { systemNames - .filter( sn => { - val serde = getSerdeName(sn) - serde.isDefined && !serde.get.equals("") - }).map(systemName => { + .filter(systemName => getSerdeName(systemName).isDefined) + .map(systemName => { val serdeName = getSerdeName(systemName).get - val serde = serdes.getOrElse(serdeName, throw new SamzaException("No class defined for serde: %s." format serdeName)) + val serde = serdes.getOrElse(serdeName, throw new SamzaException("buildSystemSerdeMap: No class defined for serde: %s." format serdeName)) (systemName, serde) }).toMap } @@ -248,7 +246,7 @@ object SamzaContainer extends Logging { .filter(systemStream => getSerdeName(systemStream).isDefined) .map(systemStream => { val serdeName = getSerdeName(systemStream).get - val serde = serdes.getOrElse(serdeName, throw new SamzaException("No class defined for serde: %s." format serdeName)) + val serde = serdes.getOrElse(serdeName, throw new SamzaException("buildSystemStreamSerdeMap: No class defined for serde: %s." format serdeName)) (systemStream, serde) }).toMap } @@ -459,11 +457,11 @@ object SamzaContainer extends Logging { null } val keySerde = config.getStorageKeySerde(storeName) match { - case Some(keySerde) => serdes.getOrElse(keySerde, throw new SamzaException("No class defined for serde: %s." format keySerde)) + case Some(keySerde) => serdes.getOrElse(keySerde, throw new SamzaException("StorageKeySerde: No class defined for serde: %s." format keySerde)) case _ => null } val msgSerde = config.getStorageMsgSerde(storeName) match { - case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new SamzaException("No class defined for serde: %s." format msgSerde)) + case Some(msgSerde) => serdes.getOrElse(msgSerde, throw new SamzaException("StorageMsgSerde: No class defined for serde: %s." format msgSerde)) case _ => null } val storeBaseDir = if(changeLogSystemStreamPartition != null) { http://git-wip-us.apache.org/repos/asf/samza/blob/312c1b17/samza-core/src/main/scala/org/apache/samza/util/Util.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/util/Util.scala b/samza-core/src/main/scala/org/apache/samza/util/Util.scala index fc3d085..c77d929 100644 --- a/samza-core/src/main/scala/org/apache/samza/util/Util.scala +++ b/samza-core/src/main/scala/org/apache/samza/util/Util.scala @@ -359,7 +359,7 @@ object Util extends Logging { case "long" => classOf[LongSerdeFactory].getCanonicalName case "serializable" => classOf[SerializableSerdeFactory[java.io.Serializable]].getCanonicalName case "string" => classOf[StringSerdeFactory].getCanonicalName - case _ => throw new SamzaException("No class defined for serde %s" format serdeName) + case _ => throw new SamzaException("defaultSerdeFactoryFromSerdeName: No class defined for serde %s" format serdeName) } info("use default serde %s for %s" format (serde, serdeName)) serde
