Repository: samza Updated Branches: refs/heads/master 03e5026cf -> c51693bcd
SAMZA-1489: TaskInstance should commit offset before it closes() if auto commit is enabled Author: Dong Lin <[email protected]> Reviewers: Jagadish<[email protected]> Closes #417 from lindong28/SAMZA-1489 Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c51693bc Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c51693bc Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c51693bc Branch: refs/heads/master Commit: c51693bcdfb67a96a53c0dd9075e4c552d201eb2 Parents: 03e5026 Author: Dong Lin <[email protected]> Authored: Thu Feb 1 12:07:56 2018 -0800 Committer: Jagadish <[email protected]> Committed: Thu Feb 1 12:07:56 2018 -0800 ---------------------------------------------------------------------- .../apache/samza/container/RunLoopFactory.java | 17 +++------ .../org/apache/samza/config/TaskConfig.scala | 40 +++++++++++--------- .../apache/samza/container/SamzaContainer.scala | 6 +++ 3 files changed, 35 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c51693bc/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java index f19c240..d399fd0 100644 --- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java +++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java @@ -28,11 +28,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.JavaConverters; import scala.runtime.AbstractFunction1; - import java.util.concurrent.ExecutorService; import static org.apache.samza.util.Util.asScalaClock; -import static org.apache.samza.util.ScalaToJavaUtils.defaultValue; + /** * Factory class to create runloop for a Samza task, based on the type @@ -41,10 +40,6 @@ import static org.apache.samza.util.ScalaToJavaUtils.defaultValue; public class RunLoopFactory { private static final Logger log = LoggerFactory.getLogger(RunLoopFactory.class); - private static final long DEFAULT_WINDOW_MS = -1L; - private static final long DEFAULT_COMMIT_MS = 60000L; - private static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L; - public static Runnable createRunLoop(scala.collection.immutable.Map<TaskName, TaskInstance> taskInstances, SystemConsumers consumerMultiplexer, ExecutorService threadPool, @@ -53,11 +48,11 @@ public class RunLoopFactory { TaskConfig config, HighResolutionClock clock) { - long taskWindowMs = config.getWindowMs().getOrElse(defaultValue(DEFAULT_WINDOW_MS)); + long taskWindowMs = config.getWindowMs(); log.info("Got window milliseconds: {}.", taskWindowMs); - long taskCommitMs = config.getCommitMs().getOrElse(defaultValue(DEFAULT_COMMIT_MS)); + long taskCommitMs = config.getCommitMs(); log.info("Got commit milliseconds: {}.", taskCommitMs); @@ -85,15 +80,15 @@ public class RunLoopFactory { taskCommitMs, asScalaClock(() -> System.nanoTime())); } else { - Integer taskMaxConcurrency = config.getMaxConcurrency().getOrElse(defaultValue(1)); + Integer taskMaxConcurrency = config.getMaxConcurrency(); log.info("Got taskMaxConcurrency: {}.", taskMaxConcurrency); - boolean isAsyncCommitEnabled = config.getAsyncCommit().getOrElse(defaultValue(false)); + boolean isAsyncCommitEnabled = config.getAsyncCommit(); log.info("Got asyncCommitEnabled: {}.", isAsyncCommitEnabled); - Long callbackTimeout = config.getCallbackTimeoutMs().getOrElse(defaultValue(DEFAULT_CALLBACK_TIMEOUT_MS)); + Long callbackTimeout = config.getCallbackTimeoutMs(); log.info("Got callbackTimeout: {}.", callbackTimeout); http://git-wip-us.apache.org/repos/asf/samza/blob/c51693bc/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala index 419e15b..8b9a72b 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala @@ -19,6 +19,7 @@ package org.apache.samza.config +import org.apache.samza.container.RunLoopFactory import org.apache.samza.system.SystemStream import org.apache.samza.util.{Logging, Util} @@ -43,6 +44,11 @@ object TaskConfig { val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms" // timeout period for triggering a callback val ASYNC_COMMIT = "task.async.commit" // to enable async commit in a AsyncStreamTask + val DEFAULT_WINDOW_MS: Long = -1L + val DEFAULT_COMMIT_MS = 60000L + val DEFAULT_CALLBACK_TIMEOUT_MS: Long = -1L + val DEFAULT_MAX_CONCURRENCY: Int = 1 + /** * Samza's container polls for more messages under two conditions. The first * condition arises when there are simply no remaining buffered messages to @@ -75,14 +81,14 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging { case _ => Set[SystemStream]() } - def getWindowMs: Option[Long] = getOption(TaskConfig.WINDOW_MS) match { - case Some(ms) => Some(ms.toLong) - case _ => None + def getWindowMs: Long = getOption(TaskConfig.WINDOW_MS) match { + case Some(ms) => ms.toLong + case _ => TaskConfig.DEFAULT_WINDOW_MS } - def getCommitMs: Option[Long] = getOption(TaskConfig.COMMIT_MS) match { - case Some(ms) => Some(ms.toLong) - case _ => None + def getCommitMs: Long = getOption(TaskConfig.COMMIT_MS) match { + case Some(ms) => ms.toLong + case _ => TaskConfig.DEFAULT_COMMIT_MS } def getShutdownMs: Option[Long] = getOption(TaskConfig.SHUTDOWN_MS) match { @@ -123,23 +129,23 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging { } } - def getMaxConcurrency: Option[Int] = getOption(TaskConfig.MAX_CONCURRENCY) match { - case Some(count) => Some(count.toInt) - case _ => None + def getMaxConcurrency: Int = getOption(TaskConfig.MAX_CONCURRENCY) match { + case Some(count) => count.toInt + case _ => TaskConfig.DEFAULT_MAX_CONCURRENCY } - def getCallbackTimeoutMs: Option[Long] = getOption(TaskConfig.CALLBACK_TIMEOUT_MS) match { - case Some(ms) => Some(ms.toLong) - case _ => None + def getCallbackTimeoutMs: Long = getOption(TaskConfig.CALLBACK_TIMEOUT_MS) match { + case Some(ms) => ms.toLong + case _ => TaskConfig.DEFAULT_CALLBACK_TIMEOUT_MS } - def getAsyncCommit: Option[Boolean] = getOption(TaskConfig.ASYNC_COMMIT) match { - case Some(asyncCommit) => Some(asyncCommit.toBoolean) - case _ => None + def getAsyncCommit: Boolean = getOption(TaskConfig.ASYNC_COMMIT) match { + case Some(asyncCommit) => asyncCommit.toBoolean + case _ => false } - def isAutoCommitEnabled() = getOption(TaskConfig.COMMIT_MS) match { + def isAutoCommitEnabled: Boolean = getOption(TaskConfig.COMMIT_MS) match { case Some(commitMs) => commitMs.toInt > 0 - case _ => true + case _ => TaskConfig.DEFAULT_COMMIT_MS > 0 } } http://git-wip-us.apache.org/repos/asf/samza/blob/c51693bc/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 354a8e7..fda654d 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -656,6 +656,7 @@ class SamzaContainer( val shutdownMs = containerContext.config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS) var shutdownHookThread: Thread = null var jmxServer: JmxServer = null + val isAutoCommitEnabled = containerContext.config.isAutoCommitEnabled @volatile private var status = SamzaContainerStatus.NOT_STARTED private var exceptionSeen: Throwable = null @@ -991,6 +992,11 @@ class SamzaContainer( } } + if (isAutoCommitEnabled) { + info("Committing offsets for all task instances") + taskInstances.values.foreach(_.commit) + } + taskInstances.values.foreach(_.shutdownTask) }
