[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366292#comment-15366292 ]
ASF GitHub Bot commented on NIFI-2078: -------------------------------------- Github user ijokarumawak commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r69931510 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java --- @@ -481,4 +496,51 @@ private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map<Stri session.transfer(flowFile, REL_SUCCESS); } } + + @Override + public StateMap getState() throws IOException { + if (!isReadyToAccessState()) { + return null; + } + final Map<String, String> partitionOffsets = KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId); + + return new StandardStateMap(partitionOffsets, System.currentTimeMillis()); + } + + private boolean isReadyToAccessState() { + if(StringUtils.isEmpty(zookeeperConnectionString) + || StringUtils.isEmpty(topic) + || StringUtils.isEmpty(groupId)) { + return false; + } + return true; + } + + @Override + public void clear() throws IOException { + if (!isReadyToAccessState()) { + return; + } + KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, topic, groupId); --- End diff -- Thanks, good point, I've added synchronized block in clearExternalState(). > State management for processors whose states are managed externally > ------------------------------------------------------------------- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework > Reporter: Koji Kawamura > Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)