[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366297#comment-15366297 ]
ASF GitHub Bot commented on NIFI-2078: -------------------------------------- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r69931774 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java --- @@ -239,4 +300,188 @@ private void releaseFlowFile(FlowFile flowFile, ProcessContext context, ProcessS this.getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] { flowFile, msgCount, executionDuration }); session.transfer(flowFile, REL_SUCCESS); } + + @Override + public StateMap getState() throws IOException { + + if (!isReadyToAccessState()) { + return null; + } + + final String groupId = kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG); + return submitConsumerGroupCommand("Fetch offsets", consumer -> { + final Map<String, String> partitionOffsets = consumer.partitionsFor(topic).stream() + .map(p -> new TopicPartition(topic, p.partition())) + .map(tp -> new ImmutablePair<>(tp, consumer.committed(tp))) + .filter(tpo -> tpo.right != null) + .collect(Collectors.toMap(tpo -> + "partition:" + tpo.left.partition(), + tpo -> String.valueOf(tpo.right.offset()))); + + logger.info("Retrieved offsets from Kafka, topic={}, groupId={}, partitionOffsets={}", + topic, groupId, partitionOffsets); + + return new StandardStateMap(partitionOffsets, System.currentTimeMillis()); + }, null); + } + + private boolean isReadyToAccessState() { + if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(brokers) + || kafkaProperties == null || StringUtils.isEmpty(kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG))) { + return false; + } + return true; + } + + /** + * <p>Clear offsets stored in Kafka, by committing -1 as offset of each partitions of specified topic.</p> + * + * <p>Kafka allows commitSync if one of following conditions are met, + * see kafka.coordinator.GroupCoordinator.handleCommitOffsets for detail: + * <ol> + * <li>The consumer is a member of the consumer group. In this case, + * even if there's other consumers connecting Kafka, offsets can be updated. + * It's dangerous to clear offsets if there're active consumers. + * When consumer.subscribe() and poll() are called, the consumer will be a member of the consumer group.</li> + * + * <li>There's no connected consumer within the group, + * and Kafka GroupCoordinator has marked the group as dead. + * It's safer but can take longer.</li> + * </ol> + * + * <p>The consumer group state transition is an async operation at Kafka group coordinator. + * Although clear() can only be called when the processor is stopped, + * the consumer group may not be fully removed at Kafka, in that case, CommitFailedException will be thrown.</p> + * + * <p>Following log msg can be found when GroupCoordinator has marked the group as dead + * in kafka.out on a Kafka broker server, it can take more than 30 seconds: + * <blockquote>[GroupCoordinator]: Group [gid] generation 1 is dead + * and removed (kafka.coordinator.GroupCoordinator)</blockquote></p> + * + */ + @Override + public void clear() throws IOException { + + if (!isReadyToAccessState()) { + return; + } + + final String groupId = kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG); + final Boolean result = submitConsumerGroupCommand("Clear offsets", consumer -> { --- End diff -- Added synchronized block here as well. > State management for processors whose states are managed externally > ------------------------------------------------------------------- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework > Reporter: Koji Kawamura > Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)