This is an automated email from the ASF dual-hosted git repository.
shanthoosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 997c9f2 SAMZA-2176: Ignore the configuration keys with serialized
null values from the coordinator stream. (#1010)
997c9f2 is described below
commit 997c9f26f72b39e20078b39e29c8f79d345c9e74
Author: shanthoosh <[email protected]>
AuthorDate: Sat Apr 27 09:44:57 2019 -0700
SAMZA-2176: Ignore the configuration keys with serialized null values from
the coordinator stream. (#1010)
---
.../org/apache/samza/util/CoordinatorStreamUtil.scala | 17 +++++++++++++----
1 file changed, 13 insertions(+), 4 deletions(-)
diff --git
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
index 2fca36c..485a55c 100644
---
a/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
+++
b/samza-core/src/main/scala/org/apache/samza/util/CoordinatorStreamUtil.scala
@@ -22,6 +22,7 @@
package org.apache.samza.util
import java.util
+import org.apache.commons.lang3.StringUtils
import org.apache.samza.SamzaException
import org.apache.samza.config._
import org.apache.samza.system.{SystemFactory, SystemStream}
@@ -34,7 +35,7 @@ import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
import scala.collection.immutable.Map
import scala.collection.JavaConverters._
-object CoordinatorStreamUtil {
+object CoordinatorStreamUtil extends Logging {
/**
* Given a job's full config object, build a subset config which includes
* only the job name, job id, and system config for the coordinator stream.
@@ -106,9 +107,17 @@ object CoordinatorStreamUtil {
val configFromCoordinatorStream: util.Map[String, Array[Byte]] =
namespaceAwareCoordinatorStreamStore.all
val configMap: util.Map[String, String] = new util.HashMap[String, String]
for ((key: String, valueAsBytes: Array[Byte]) <-
configFromCoordinatorStream.asScala) {
- val valueSerde: CoordinatorStreamValueSerde = new
CoordinatorStreamValueSerde(SetConfig.TYPE)
- val valueAsString: String = valueSerde.fromBytes(valueAsBytes)
- configMap.put(key, valueAsString)
+ if (valueAsBytes == null) {
+ warn("Value for key: %s in config is null. Ignoring it." format key)
+ } else {
+ val valueSerde: CoordinatorStreamValueSerde = new
CoordinatorStreamValueSerde(SetConfig.TYPE)
+ val valueAsString: String = valueSerde.fromBytes(valueAsBytes)
+ if (StringUtils.isBlank(valueAsString)) {
+ warn("Value for key: %s in config is empty or null. Ignoring it."
format key)
+ } else {
+ configMap.put(key, valueAsString)
+ }
+ }
}
new MapConfig(configMap)
}