This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 2aced58  SAMZA-2164: Close the consumer after reading checkpoints 
(#993)
2aced58 is described below

commit 2aced584edb8ae52cf09e7fc0fb66d1ba562d9be
Author: xinyuiscool <[email protected]>
AuthorDate: Fri Apr 12 17:42:54 2019 -0700

    SAMZA-2164: Close the consumer after reading checkpoints (#993)
---
 .../samza/checkpoint/kafka/KafkaCheckpointManager.scala      | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 4479c2d..3c96b2e 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -106,6 +106,7 @@ class KafkaCheckpointManager(checkpointSpec: 
KafkaStreamSpec,
     info(s"Starting the checkpoint SystemConsumer from oldest offset 
$oldestOffset")
     systemConsumer.register(checkpointSsp, oldestOffset)
     systemConsumer.start()
+    // the consumer will be closed after first time reading the checkpoint
   }
 
   /**
@@ -128,11 +129,11 @@ class KafkaCheckpointManager(checkpointSpec: 
KafkaStreamSpec,
     info(s"Reading checkpoint for taskName $taskName")
 
     if (taskNamesToCheckpoints == null) {
-      debug("Reading checkpoints for the first time")
+      info("Reading checkpoints for the first time")
       taskNamesToCheckpoints = readCheckpoints()
-    } else {
-      debug("Updating existing checkpoint mappings")
-      taskNamesToCheckpoints ++= readCheckpoints()
+      // Stop the system consumer since we only need to read checkpoints once
+      info("Stopping system consumer.")
+      systemConsumer.stop()
     }
 
     val checkpoint: Checkpoint = taskNamesToCheckpoints.getOrElse(taskName, 
null)
@@ -218,9 +219,6 @@ class KafkaCheckpointManager(checkpointSpec: 
KafkaStreamSpec,
     info ("Stopping system producer.")
     producerRef.get().stop()
 
-    info("Stopping system consumer.")
-    systemConsumer.stop()
-
     info("CheckpointManager stopped.")
   }
 

Reply via email to