This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new b193d45  SAMZA-2493: Keep checkpoint manager consumer open for 
repeated polling (#1327)
b193d45 is described below

commit b193d45a8cd53996a0272817cc37a59255058092
Author: bkonold <[email protected]>
AuthorDate: Tue Apr 7 09:44:23 2020 -0700

    SAMZA-2493: Keep checkpoint manager consumer open for repeated polling 
(#1327)
---
 .../java/org/apache/samza/config/TaskConfig.java   | 10 +++
 .../apache/samza/container/SamzaContainer.scala    | 10 ++-
 .../checkpoint/kafka/KafkaCheckpointManager.scala  | 22 +++++--
 .../kafka/TestKafkaCheckpointManager.scala         | 71 +++++++++++++++++++---
 4 files changed, 100 insertions(+), 13 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java 
b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
index b02f6c9..468d9c9 100644
--- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java
@@ -105,6 +105,8 @@ public class TaskConfig extends MapConfig {
   private static final String BROADCAST_STREAM_PATTERN = "^[\\d]+$";
   private static final String BROADCAST_STREAM_RANGE_PATTERN = 
"^\\[[\\d]+\\-[\\d]+\\]$";
   public static final String CHECKPOINT_MANAGER_FACTORY = 
"task.checkpoint.factory";
+  // standby containers use this flag to indicate that checkpoints will be 
polled continually, rather than only once at startup like in an active container
+  public static final String 
INTERNAL_CHECKPOINT_MANAGER_CONSUMER_STOP_AFTER_FIRST_READ = 
"samza.internal.task.checkpoint.consumer.stop.after.first.read";
 
   public static final String TRANSACTIONAL_STATE_CHECKPOINT_ENABLED = 
"task.transactional.state.checkpoint.enabled";
   private static final boolean DEFAULT_TRANSACTIONAL_STATE_CHECKPOINT_ENABLED 
= true;
@@ -214,6 +216,14 @@ public class TaskConfig extends MapConfig {
   }
 
   /**
+   * Internal config to indicate whether the SystemConsumer underlying a 
CheckpointManager should be stopped after
+   * initial read of checkpoints.
+   */
+  public boolean getCheckpointManagerConsumerStopAfterFirstRead() {
+    return 
getBoolean(INTERNAL_CHECKPOINT_MANAGER_CONSUMER_STOP_AFTER_FIRST_READ, true);
+  }
+
+  /**
    * Get the systemStreamPartitions of the broadcast stream. Specifying
    * one partition for one stream or a range of the partitions for one
    * stream is allowed.
diff --git 
a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala 
b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index c78e841..6fab351 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -50,6 +50,7 @@ import org.apache.samza.task._
 import org.apache.samza.util.ScalaJavaUtil.JavaOptionals
 import org.apache.samza.util.{Util, _}
 import org.apache.samza.SamzaException
+import org.apache.samza.clustermanager.StandbyTaskUtil
 
 import scala.collection.JavaConverters._
 
@@ -132,7 +133,14 @@ object SamzaContainer extends Logging {
     localityManager: LocalityManager = null,
     startpointManager: StartpointManager = null,
     diagnosticsManager: Option[DiagnosticsManager] = Option.empty) = {
-    val config = jobContext.getConfig
+    val config = if (StandbyTaskUtil.isStandbyContainer(containerId)) {
+      // standby containers will need to continually poll checkpoint messages
+      val newConfig = new util.HashMap[String, String](jobContext.getConfig)
+      
newConfig.put(TaskConfig.INTERNAL_CHECKPOINT_MANAGER_CONSUMER_STOP_AFTER_FIRST_READ,
 java.lang.Boolean.FALSE.toString)
+      new MapConfig(newConfig)
+    } else {
+      jobContext.getConfig
+    }
     val jobConfig = new JobConfig(config)
     val systemConfig = new SystemConfig(config)
     val containerModel = jobModel.getContainers.get(containerId)
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 87c84aa..1c3531f 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicReference
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.base.Preconditions
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
-import org.apache.samza.config.{Config, JobConfig}
+import org.apache.samza.config.{Config, JobConfig, TaskConfig}
 import org.apache.samza.container.TaskName
 import org.apache.samza.serializers.Serde
 import org.apache.samza.metrics.MetricsRegistry
@@ -76,6 +76,11 @@ class KafkaCheckpointManager(checkpointSpec: KafkaStreamSpec,
   val producerRef: AtomicReference[SystemProducer] = new 
AtomicReference[SystemProducer](getSystemProducer())
   val producerCreationLock: Object = new Object
 
+  // if true, systemConsumer can be safely closed after the first call to 
readLastCheckpoint.
+  // if false, it must be left open until KafkaCheckpointManager::stop is 
called.
+  // for active containers, this will be set to true, while false for standby 
containers.
+  val stopConsumerAfterFirstRead: Boolean = new 
TaskConfig(config).getCheckpointManagerConsumerStopAfterFirstRead
+
   /**
     * Create checkpoint stream prior to start.
     */
@@ -107,7 +112,6 @@ class KafkaCheckpointManager(checkpointSpec: 
KafkaStreamSpec,
     info(s"Starting the checkpoint SystemConsumer from oldest offset 
$oldestOffset")
     systemConsumer.register(checkpointSsp, oldestOffset)
     systemConsumer.start()
-    // the consumer will be closed after first time reading the checkpoint
   }
 
   /**
@@ -132,9 +136,12 @@ class KafkaCheckpointManager(checkpointSpec: 
KafkaStreamSpec,
     if (taskNamesToCheckpoints == null) {
       info("Reading checkpoints for the first time")
       taskNamesToCheckpoints = readCheckpoints()
-      // Stop the system consumer since we only need to read checkpoints once
-      info("Stopping system consumer.")
-      systemConsumer.stop()
+      if (stopConsumerAfterFirstRead) {
+        info("Stopping system consumer")
+        systemConsumer.stop()
+      }
+    } else if (!stopConsumerAfterFirstRead) {
+      taskNamesToCheckpoints ++= readCheckpoints()
     }
 
     val checkpoint: Checkpoint = taskNamesToCheckpoints.getOrElse(taskName, 
null)
@@ -220,6 +227,11 @@ class KafkaCheckpointManager(checkpointSpec: 
KafkaStreamSpec,
     info ("Stopping system producer.")
     producerRef.get().stop()
 
+    if (!stopConsumerAfterFirstRead) {
+      info("Stopping system consumer")
+      systemConsumer.stop()
+    }
+
     info("CheckpointManager stopped.")
   }
 
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 9766ce8..2e7a7e4 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -38,6 +38,7 @@ import org.apache.samza.{Partition, SamzaException}
 import org.junit.Assert._
 import org.junit._
 import org.mockito.Mockito
+import org.mockito.Matchers
 
 class TestKafkaCheckpointManager extends KafkaServerTestHarness {
 
@@ -129,18 +130,13 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
   def testWriteCheckpointShouldRetryFiniteTimesOnFailure(): Unit = {
     val checkpointTopic = "checkpoint-topic-2"
     val mockKafkaProducer: SystemProducer = 
Mockito.mock(classOf[SystemProducer])
-
-    class MockSystemFactory extends KafkaSystemFactory {
-      override def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemProducer = {
-        mockKafkaProducer
-      }
-    }
+    val mockKafkaSystemConsumer: SystemConsumer = 
Mockito.mock(classOf[SystemConsumer])
 
     Mockito.doThrow(new 
RuntimeException()).when(mockKafkaProducer).flush(taskName.getTaskName)
 
     val props = new 
org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties()
     val spec = new KafkaStreamSpec("id", checkpointTopic, 
checkpointSystemName, 1, 1, props)
-    val checkPointManager = new KafkaCheckpointManager(spec, new 
MockSystemFactory, false, config, new NoOpMetricsRegistry)
+    val checkPointManager = new KafkaCheckpointManager(spec, new 
MockSystemFactory(mockKafkaSystemConsumer, mockKafkaProducer), false, config, 
new NoOpMetricsRegistry)
     checkPointManager.MaxRetryDurationInMillis = 1
 
     try {
@@ -186,6 +182,55 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
     kcm.stop()
   }
 
+  @Test
+  def testConsumerStopsAfterInitialReadIfConfigSetTrue(): Unit = {
+    val mockKafkaSystemConsumer: SystemConsumer = 
Mockito.mock(classOf[SystemConsumer])
+
+    val checkpointTopic = "checkpoint-topic-test"
+    val props = new 
org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties()
+    val spec = new KafkaStreamSpec("id", checkpointTopic, 
checkpointSystemName, 1, 1, props)
+
+    val configMapWithOverride = new java.util.HashMap[String, String](config)
+    
configMapWithOverride.put(TaskConfig.INTERNAL_CHECKPOINT_MANAGER_CONSUMER_STOP_AFTER_FIRST_READ,
 "true")
+    val kafkaCheckpointManager = new KafkaCheckpointManager(spec, new 
MockSystemFactory(mockKafkaSystemConsumer), false, new 
MapConfig(configMapWithOverride), new NoOpMetricsRegistry)
+
+    kafkaCheckpointManager.register(taskName)
+    kafkaCheckpointManager.start()
+    kafkaCheckpointManager.readLastCheckpoint(taskName)
+
+    Mockito.verify(mockKafkaSystemConsumer, 
Mockito.times(1)).register(Matchers.any(), Matchers.any())
+    Mockito.verify(mockKafkaSystemConsumer, Mockito.times(1)).start()
+    Mockito.verify(mockKafkaSystemConsumer, 
Mockito.times(1)).poll(Matchers.any(), Matchers.any())
+    Mockito.verify(mockKafkaSystemConsumer, Mockito.times(1)).stop()
+
+    kafkaCheckpointManager.stop()
+
+    Mockito.verifyNoMoreInteractions(mockKafkaSystemConsumer)
+  }
+
+  @Test
+  def testConsumerDoesNotStopAfterInitialReadIfConfigSetFalse(): Unit = {
+    val mockKafkaSystemConsumer: SystemConsumer = 
Mockito.mock(classOf[SystemConsumer])
+
+    val checkpointTopic = "checkpoint-topic-test"
+    val props = new 
org.apache.samza.config.KafkaConfig(config).getCheckpointTopicProperties()
+    val spec = new KafkaStreamSpec("id", checkpointTopic, 
checkpointSystemName, 1, 1, props)
+
+    val configMapWithOverride = new java.util.HashMap[String, String](config)
+    
configMapWithOverride.put(TaskConfig.INTERNAL_CHECKPOINT_MANAGER_CONSUMER_STOP_AFTER_FIRST_READ,
 "false")
+    val kafkaCheckpointManager = new KafkaCheckpointManager(spec, new 
MockSystemFactory(mockKafkaSystemConsumer), false, new 
MapConfig(configMapWithOverride), new NoOpMetricsRegistry)
+
+    kafkaCheckpointManager.register(taskName)
+    kafkaCheckpointManager.start()
+    kafkaCheckpointManager.readLastCheckpoint(taskName)
+
+    Mockito.verify(mockKafkaSystemConsumer, Mockito.times(0)).stop()
+
+    kafkaCheckpointManager.stop()
+
+    Mockito.verify(mockKafkaSystemConsumer, Mockito.times(1)).stop()
+  }
+
   @After
   override def tearDown(): Unit = {
     if (servers != null) {
@@ -243,4 +288,16 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
     adminZkClient.createTopic(cpTopic, partNum, 1, props)
   }
 
+  class MockSystemFactory(
+    mockKafkaSystemConsumer: SystemConsumer = 
Mockito.mock(classOf[SystemConsumer]),
+    mockKafkaProducer: SystemProducer = Mockito.mock(classOf[SystemProducer])) 
extends KafkaSystemFactory {
+    override def getProducer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemProducer = {
+      mockKafkaProducer
+    }
+
+    override def getConsumer(systemName: String, config: Config, registry: 
MetricsRegistry): SystemConsumer = {
+      mockKafkaSystemConsumer
+    }
+  }
+
 }

Reply via email to