[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15394488#comment-15394488 ]
ASF GitHub Bot commented on NIFI-2078: -------------------------------------- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72330212 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java --- @@ -54,7 +72,12 @@ @InputRequirement(Requirement.INPUT_FORBIDDEN) @CapabilityDescription("Consumes messages from Apache Kafka") @Tags({ "Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume" }) -public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[]>> { +@Stateful(scopes = {Scope.EXTERNAL}, description = "After consuming messages, ConsumeKafka commits its offset information to Kafka" + + " so that the state of a consumer group can be retained across events such as consumer reconnect." + + " Offsets can be cleared when there is no consumer subscribing with the same consumer group id." + + " It may take more than 30 seconds for a consumer group to become able to be cleared after it is stopped from NiFi." + + " Once offsets are cleared, ConsumeKafka will resume consuming messages based on Offset Reset configuration.") +public class ConsumeKafka extends AbstractKafkaProcessor<Consumer<byte[], byte[]>> implements ExternalStateManager { --- End diff -- Since "ExternalStateManager" is something that the developer will need to implement if they will use it, this should be documented in the Developer guide > State management for processors whose states are managed externally > ------------------------------------------------------------------- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework > Reporter: Koji Kawamura > Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)