This is an automated email from the ASF dual-hosted git repository.

shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 997c9f2  SAMZA-2176: Ignore the configuration keys with serialized 
null values from the coordinator stream. (#1010)
997c9f2 is described below

commit 997c9f26f72b39e20078b39e29c8f79d345c9e74
Author: shanthoosh <[email protected]>
AuthorDate: Sat Apr 27 09:44:57 2019 -0700

    SAMZA-2176: Ignore the configuration keys with serialized null values from 
the coordinator stream. (#1010)
---
 .../org/apache/samza/util/CoordinatorStreamUtil.scala   | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 2fca36c..485a55c 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++ 
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -22,6 +22,7 @@
 package org.apache.samza.util
 
 import java.util
+import org.apache.commons.lang3.StringUtils
 import org.apache.samza.SamzaException
 import org.apache.samza.config._
 import org.apache.samza.system.{SystemFactory, SystemStream}
@@ -34,7 +35,7 @@ import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import scala.collection.immutable.Map
 import scala.collection.JavaConverters._
 
-object CoordinatorStreamUtil {
+object CoordinatorStreamUtil extends Logging {
   /**
     * Given a job's full config object, build a subset config which includes
     * only the job name, job id, and system config for the coordinator stream.
@@ -106,9 +107,17 @@ object CoordinatorStreamUtil {
     val configFromCoordinatorStream: util.Map[String, Array[Byte]] = 
namespaceAwareCoordinatorStreamStore.all
     val configMap: util.Map[String, String] = new util.HashMap[String, String]
     for ((key: String, valueAsBytes: Array[Byte]) <- 
configFromCoordinatorStream.asScala) {
-      val valueSerde: CoordinatorStreamValueSerde = new 
CoordinatorStreamValueSerde(SetConfig.TYPE)
-      val valueAsString: String = valueSerde.fromBytes(valueAsBytes)
-      configMap.put(key, valueAsString)
+      if (valueAsBytes == null) {
+        warn("Value for key: %s in config is null. Ignoring it." format key)
+      } else {
+        val valueSerde: CoordinatorStreamValueSerde = new 
CoordinatorStreamValueSerde(SetConfig.TYPE)
+        val valueAsString: String = valueSerde.fromBytes(valueAsBytes)
+        if (StringUtils.isBlank(valueAsString)) {
+          warn("Value for key: %s in config is empty or null. Ignoring it." 
format key)
+        } else {
+          configMap.put(key, valueAsString)
+        }
+      }
     }
     new MapConfig(configMap)
   }

Reply via email to