This is an automated email from the ASF dual-hosted git repository.
xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 2aced58 SAMZA-2164: Close the consumer after reading checkpoints
(#993)
2aced58 is described below
commit 2aced584edb8ae52cf09e7fc0fb66d1ba562d9be
Author: xinyuiscool <[email protected]>
AuthorDate: Fri Apr 12 17:42:54 2019 -0700
SAMZA-2164: Close the consumer after reading checkpoints (#993)
---
.../samza/checkpoint/kafka/KafkaCheckpointManager.scala | 12 +++++-------
1 file changed, 5 insertions(+), 7 deletions(-)
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index 4479c2d..3c96b2e 100644
---
a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++
b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -106,6 +106,7 @@ class KafkaCheckpointManager(checkpointSpec:
KafkaStreamSpec,
info(s"Starting the checkpoint SystemConsumer from oldest offset
$oldestOffset")
systemConsumer.register(checkpointSsp, oldestOffset)
systemConsumer.start()
+ // the consumer will be closed after first time reading the checkpoint
}
/**
@@ -128,11 +129,11 @@ class KafkaCheckpointManager(checkpointSpec:
KafkaStreamSpec,
info(s"Reading checkpoint for taskName $taskName")
if (taskNamesToCheckpoints == null) {
- debug("Reading checkpoints for the first time")
+ info("Reading checkpoints for the first time")
taskNamesToCheckpoints = readCheckpoints()
- } else {
- debug("Updating existing checkpoint mappings")
- taskNamesToCheckpoints ++= readCheckpoints()
+ // Stop the system consumer since we only need to read checkpoints once
+ info("Stopping system consumer.")
+ systemConsumer.stop()
}
val checkpoint: Checkpoint = taskNamesToCheckpoints.getOrElse(taskName,
null)
@@ -218,9 +219,6 @@ class KafkaCheckpointManager(checkpointSpec:
KafkaStreamSpec,
info ("Stopping system producer.")
producerRef.get().stop()
- info("Stopping system consumer.")
- systemConsumer.stop()
-
info("CheckpointManager stopped.")
}