This is an automated email from the ASF dual-hosted git repository.
japetrsn pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push:
new 30f549d Set running state after brokers are connected (#340)
30f549d is described below
commit 30f549db6014bcc3e8e1e340e39a2f5cdc58a784
Author: James Dubee <[email protected]>
AuthorDate: Mon May 13 16:58:28 2019 -0400
Set running state after brokers are connected (#340)
---
provider/consumer.py | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/provider/consumer.py b/provider/consumer.py
index 2ced050..91d4094 100644
--- a/provider/consumer.py
+++ b/provider/consumer.py
@@ -329,7 +329,6 @@ class ConsumerProcess (Process):
if self.secondsSinceLastPoll() < 0:
logging.info('[{}] Completed first
poll'.format(self.trigger))
- self.__recordState(Consumer.State.Running)
if (message is not None):
if not message.error():
@@ -530,6 +529,10 @@ class ConsumerProcess (Process):
def __on_assign(self, consumer, partitions):
logging.info('[{}] Completed partition assignment. Connected to
broker(s)'.format(self.trigger))
+ if self.currentState() == Consumer.State.Initializing and
self.__shouldRun():
+ logging.info('[{}] Setting consumer state to
runnning.'.format(self.trigger))
+ self.__recordState(Consumer.State.Running)
+
def __on_revoke(self, consumer, partitions):
logging.info('[{}] Partition assignment has been revoked. Disconnected
from broker(s)'.format(self.trigger))