Repository: samza
Updated Branches:
  refs/heads/master 03e5026cf -> c51693bcd


SAMZA-1489: TaskInstance should commit offset before it closes() if auto commit 
is enabled

Author: Dong Lin <[email protected]>

Reviewers: Jagadish<[email protected]>

Closes #417 from lindong28/SAMZA-1489


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c51693bc
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c51693bc
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c51693bc

Branch: refs/heads/master
Commit: c51693bcdfb67a96a53c0dd9075e4c552d201eb2
Parents: 03e5026
Author: Dong Lin <[email protected]>
Authored: Thu Feb 1 12:07:56 2018 -0800
Committer: Jagadish <[email protected]>
Committed: Thu Feb 1 12:07:56 2018 -0800

----------------------------------------------------------------------
 .../apache/samza/container/RunLoopFactory.java  | 17 +++------
 .../org/apache/samza/config/TaskConfig.scala    | 40 +++++++++++---------
 .../apache/samza/container/SamzaContainer.scala |  6 +++
 3 files changed, 35 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c51693bc/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java 
b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
index f19c240..d399fd0 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoopFactory.java
@@ -28,11 +28,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.collection.JavaConverters;
 import scala.runtime.AbstractFunction1;
-
 import java.util.concurrent.ExecutorService;
 
 import static org.apache.samza.util.Util.asScalaClock;
-import static org.apache.samza.util.ScalaToJavaUtils.defaultValue;
+
 
 /**
  * Factory class to create runloop for a Samza task, based on the type
@@ -41,10 +40,6 @@ import static 
org.apache.samza.util.ScalaToJavaUtils.defaultValue;
 public class RunLoopFactory {
   private static final Logger log = 
LoggerFactory.getLogger(RunLoopFactory.class);
 
-  private static final long DEFAULT_WINDOW_MS = -1L;
-  private static final long DEFAULT_COMMIT_MS = 60000L;
-  private static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
-
   public static Runnable 
createRunLoop(scala.collection.immutable.Map<TaskName, TaskInstance> 
taskInstances,
       SystemConsumers consumerMultiplexer,
       ExecutorService threadPool,
@@ -53,11 +48,11 @@ public class RunLoopFactory {
       TaskConfig config,
       HighResolutionClock clock) {
 
-    long taskWindowMs = 
config.getWindowMs().getOrElse(defaultValue(DEFAULT_WINDOW_MS));
+    long taskWindowMs = config.getWindowMs();
 
     log.info("Got window milliseconds: {}.", taskWindowMs);
 
-    long taskCommitMs = 
config.getCommitMs().getOrElse(defaultValue(DEFAULT_COMMIT_MS));
+    long taskCommitMs = config.getCommitMs();
 
     log.info("Got commit milliseconds: {}.", taskCommitMs);
 
@@ -85,15 +80,15 @@ public class RunLoopFactory {
         taskCommitMs,
         asScalaClock(() -> System.nanoTime()));
     } else {
-      Integer taskMaxConcurrency = 
config.getMaxConcurrency().getOrElse(defaultValue(1));
+      Integer taskMaxConcurrency = config.getMaxConcurrency();
 
       log.info("Got taskMaxConcurrency: {}.", taskMaxConcurrency);
 
-      boolean isAsyncCommitEnabled = 
config.getAsyncCommit().getOrElse(defaultValue(false));
+      boolean isAsyncCommitEnabled = config.getAsyncCommit();
 
       log.info("Got asyncCommitEnabled: {}.", isAsyncCommitEnabled);
 
-      Long callbackTimeout = 
config.getCallbackTimeoutMs().getOrElse(defaultValue(DEFAULT_CALLBACK_TIMEOUT_MS));
+      Long callbackTimeout = config.getCallbackTimeoutMs();
 
       log.info("Got callbackTimeout: {}.", callbackTimeout);
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c51693bc/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala 
b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 419e15b..8b9a72b 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -19,6 +19,7 @@
 
 package org.apache.samza.config
 
+import org.apache.samza.container.RunLoopFactory
 import org.apache.samza.system.SystemStream
 import org.apache.samza.util.{Logging, Util}
 
@@ -43,6 +44,11 @@ object TaskConfig {
   val CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms"  // timeout period for 
triggering a callback
   val ASYNC_COMMIT = "task.async.commit" // to enable async commit in a 
AsyncStreamTask
 
+  val DEFAULT_WINDOW_MS: Long = -1L
+  val DEFAULT_COMMIT_MS = 60000L
+  val DEFAULT_CALLBACK_TIMEOUT_MS: Long = -1L
+  val DEFAULT_MAX_CONCURRENCY: Int = 1
+
   /**
    * Samza's container polls for more messages under two conditions. The first
    * condition arises when there are simply no remaining buffered messages to
@@ -75,14 +81,14 @@ class TaskConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
     case _ => Set[SystemStream]()
   }
 
-  def getWindowMs: Option[Long] = getOption(TaskConfig.WINDOW_MS) match {
-    case Some(ms) => Some(ms.toLong)
-    case _ => None
+  def getWindowMs: Long = getOption(TaskConfig.WINDOW_MS) match {
+    case Some(ms) => ms.toLong
+    case _ => TaskConfig.DEFAULT_WINDOW_MS
   }
 
-  def getCommitMs: Option[Long] = getOption(TaskConfig.COMMIT_MS) match {
-    case Some(ms) => Some(ms.toLong)
-    case _ => None
+  def getCommitMs: Long = getOption(TaskConfig.COMMIT_MS) match {
+    case Some(ms) => ms.toLong
+    case _ => TaskConfig.DEFAULT_COMMIT_MS
   }
 
   def getShutdownMs: Option[Long] = getOption(TaskConfig.SHUTDOWN_MS) match {
@@ -123,23 +129,23 @@ class TaskConfig(config: Config) extends 
ScalaMapConfig(config) with Logging {
     }
   }
 
-  def getMaxConcurrency: Option[Int] = getOption(TaskConfig.MAX_CONCURRENCY) 
match {
-    case Some(count) => Some(count.toInt)
-    case _ => None
+  def getMaxConcurrency: Int = getOption(TaskConfig.MAX_CONCURRENCY) match {
+    case Some(count) => count.toInt
+    case _ => TaskConfig.DEFAULT_MAX_CONCURRENCY
   }
 
-  def getCallbackTimeoutMs: Option[Long] = 
getOption(TaskConfig.CALLBACK_TIMEOUT_MS) match {
-    case Some(ms) => Some(ms.toLong)
-    case _ => None
+  def getCallbackTimeoutMs: Long = getOption(TaskConfig.CALLBACK_TIMEOUT_MS) 
match {
+    case Some(ms) => ms.toLong
+    case _ => TaskConfig.DEFAULT_CALLBACK_TIMEOUT_MS
   }
 
-  def getAsyncCommit: Option[Boolean] = getOption(TaskConfig.ASYNC_COMMIT) 
match {
-    case Some(asyncCommit) => Some(asyncCommit.toBoolean)
-    case _ => None
+  def getAsyncCommit: Boolean = getOption(TaskConfig.ASYNC_COMMIT) match {
+    case Some(asyncCommit) => asyncCommit.toBoolean
+    case _ => false
   }
 
-  def isAutoCommitEnabled() = getOption(TaskConfig.COMMIT_MS) match {
+  def isAutoCommitEnabled: Boolean = getOption(TaskConfig.COMMIT_MS) match {
     case Some(commitMs) => commitMs.toInt > 0
-    case _ => true
+    case _ => TaskConfig.DEFAULT_COMMIT_MS > 0
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/c51693bc/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 354a8e7..fda654d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -656,6 +656,7 @@ class SamzaContainer(
   val shutdownMs = 
containerContext.config.getShutdownMs.getOrElse(TaskConfigJava.DEFAULT_TASK_SHUTDOWN_MS)
   var shutdownHookThread: Thread = null
   var jmxServer: JmxServer = null
+  val isAutoCommitEnabled = containerContext.config.isAutoCommitEnabled
 
   @volatile private var status = SamzaContainerStatus.NOT_STARTED
   private var exceptionSeen: Throwable = null
@@ -991,6 +992,11 @@ class SamzaContainer(
       }
     }
 
+    if (isAutoCommitEnabled) {
+      info("Committing offsets for all task instances")
+      taskInstances.values.foreach(_.commit)
+    }
+
     taskInstances.values.foreach(_.shutdownTask)
   }
 

Reply via email to