This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new aa57b9e Undoing shutdown-ms change to allow for a simpler refactor
(#1068)
aa57b9e is described below
commit aa57b9ef1b0a89b475ff9334176d00b7ff728667
Author: rmatharu <[email protected]>
AuthorDate: Wed Jun 5 13:33:02 2019 -0700
Undoing shutdown-ms change to allow for a simpler refactor (#1068)
For context, the PR that made the change that is being undone was
https://github.com/apache/samza/pull/1021.
---
samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala | 5 +++++
.../src/main/scala/org/apache/samza/container/SamzaContainer.scala | 2 +-
2 files changed, 6 insertions(+), 1 deletion(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 99ad37e..a887355 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -96,6 +96,11 @@ class TaskConfig(config: Config) extends
ScalaMapConfig(config) with Logging {
case _ => TaskConfig.DEFAULT_COMMIT_MS
}
+ def getShutdownMs: Option[Long] = getOption(TaskConfig.SHUTDOWN_MS) match {
+ case Some(ms) => Some(ms.toLong)
+ case _ => None
+ }
+
def getTaskClass = getOption(TaskConfig.TASK_CLASS)
def getCommandClass = getOption(TaskConfig.COMMAND_BUILDER)
diff --git
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 93216b9..e1cd92a 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -713,7 +713,7 @@ class SamzaContainer(
containerStorageManager: ContainerStorageManager,
diagnosticsManager: Option[DiagnosticsManager] = Option.empty) extends
Runnable with Logging {
- val shutdownMs = new TaskConfigJava(config).getShutdownMs
+ val shutdownMs =
config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
var shutdownHookThread: Thread = null
var jmxServer: JmxServer = null