[ 
https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366292#comment-15366292
 ] 

ASF GitHub Bot commented on NIFI-2078:
--------------------------------------

Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/563#discussion_r69931510
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 ---
    @@ -481,4 +496,51 @@ private void releaseFlowFile(FlowFile flowFile, 
ProcessSession session, Map<Stri
                 session.transfer(flowFile, REL_SUCCESS);
             }
         }
    +
    +    @Override
    +    public StateMap getState() throws IOException {
    +        if (!isReadyToAccessState()) {
    +            return null;
    +        }
    +        final Map<String, String> partitionOffsets = 
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
    +
    +        return new StandardStateMap(partitionOffsets, 
System.currentTimeMillis());
    +    }
    +
    +    private boolean isReadyToAccessState() {
    +        if(StringUtils.isEmpty(zookeeperConnectionString)
    +                || StringUtils.isEmpty(topic)
    +                || StringUtils.isEmpty(groupId)) {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    @Override
    +    public void clear() throws IOException {
    +        if (!isReadyToAccessState()) {
    +            return;
    +        }
    +        KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, topic, 
groupId);
    --- End diff --
    
    Thanks, good point, I've added synchronized block in clearExternalState().


> State management for processors whose states are managed externally
> -------------------------------------------------------------------
>
>                 Key: NIFI-2078
>                 URL: https://issues.apache.org/jira/browse/NIFI-2078
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Koji Kawamura
>            Assignee: Koji Kawamura
>             Fix For: 1.0.0
>
>
> Inherently by the nature of a given processor it may involve state managed by 
> itself (using nifi state management), or can be managed by some external 
> service it interacts with (kafka's offset), and theoretically some might have 
> both going on. With the new state management, we're giving users a way to 
> reset state managed by nifi for a given processor. But it doesnt apply to 
> those processors who have external state.
> we should consider offering a way to reset state that allows a processor to 
> call out to whatever external store it impacts



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to