[ 
https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399724#comment-15399724
 ] 

ASF GitHub Bot commented on NIFI-2078:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/563#discussion_r72829933
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 ---
    @@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile, 
ProcessSession session, Map<Stri
                 session.transfer(flowFile, REL_SUCCESS);
             }
         }
    +
    +    @Override
    +    public ExternalStateScope getExternalStateScope() {
    +        return ExternalStateScope.CLUSTER;
    +    }
    +
    +    @Override
    +    public StateMap getExternalState() throws IOException {
    +        // We don't have to synchronize with onTrigger here,
    +        // since it merely retrieves state from Zk using different 
channel, it doesn't affect consuming.
    +        if (!isReadyToAccessState()) {
    +            return null;
    +        }
    +        final Map<String, String> partitionOffsets = 
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
    +
    +        return new StandardStateMap(partitionOffsets, 
System.currentTimeMillis());
    +    }
    +
    +    private boolean isReadyToAccessState() {
    +        return !StringUtils.isEmpty(zookeeperConnectionString)
    +                && !StringUtils.isEmpty(topic)
    +                && !StringUtils.isEmpty(groupId);
    +    }
    +
    +    @Override
    +    public void clearExternalState() throws IOException {
    +        if (!isReadyToAccessState()) {
    +            return;
    +        }
    +        // Block onTrigger starts creating new consumer until clear offset 
finishes.
    +        synchronized (this.consumerStreamsReady) {
    +            KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, 
topic, groupId);
    +        }
    +    }
    +
    +    /**
    +     * GetKafka overrides this method in order to capture processor's 
property values required when it retrieves
    +     * its state managed externally at Kafka. Since view/clear state 
operation can be executed before onTrigger() is called,
    +     * we need to capture these values as it's modified. This method is 
also called when NiFi restarts and loads configs,
    +     * so users can access external states right after restart of NiFi.
    +     * @param descriptor of the modified property
    +     * @param oldValue non-null property value (previous)
    +     * @param newValue the new property value or if null indicates the 
property
    +     */
    +    @Override
    +    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
    +        if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) {
    +            zookeeperConnectionString = newValue;
    +        } else if (TOPIC.equals(descriptor)) {
    +            topic = newValue;
    +        } else if (GROUP_ID.equals(descriptor)) {
    +            groupId = newValue;
    --- End diff --
    
    I ran into an error due to how GroupId is getting set. This only gets 
called if someone modifies the groupid but it has a default value. So it will 
be null if the user never modifies it and they will hit this error:
    
    2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6] 
o.apache.nifi.processors.kafka.GetKafka 
GetKafka[id=3798f2c6-0156-1000-0000-0000138083b7] Processor Administratively 
Yielded for 1 sec due to processing failure
    2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6] 
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
GetKafka[id=3798f2c6-0156-1000-0000-0000138083b7] due to uncaught Exception: 
java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
    2016-07-29 13:24:12,284 WARN [Timer-Driven Process Thread-6] 
o.a.n.c.t.ContinuallyRunProcessorTask 
    java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
        at 
org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:367) ~[na:na]
        at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 ~[nifi-api-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
        at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1060)
 ~[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
        at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
 [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
        at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
        at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
 [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_74]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[na:1.8.0_74]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_74]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_74]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_74]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_74]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_74]
    Caused by: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
[na:1.8.0_74]
        at java.util.concurrent.FutureTask.get(FutureTask.java:206) 
[na:1.8.0_74]
        at 
org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:360) ~[na:na]
        ... 12 common frames omitted
    Caused by: java.lang.NullPointerException: null
        at java.util.Hashtable.put(Hashtable.java:459) ~[na:1.8.0_74]
        at java.util.Properties.setProperty(Properties.java:166) ~[na:1.8.0_74]
        at 
org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:238) 
~[na:na]
        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:355) 
~[na:na]
        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:352) 
~[na:na]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[na:1.8.0_74]
        ... 3 common frames omitted


> State management for processors whose states are managed externally
> -------------------------------------------------------------------
>
>                 Key: NIFI-2078
>                 URL: https://issues.apache.org/jira/browse/NIFI-2078
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Koji Kawamura
>            Assignee: Koji Kawamura
>             Fix For: 1.0.0
>
>
> Inherently by the nature of a given processor it may involve state managed by 
> itself (using nifi state management), or can be managed by some external 
> service it interacts with (kafka's offset), and theoretically some might have 
> both going on. With the new state management, we're giving users a way to 
> reset state managed by nifi for a given processor. But it doesnt apply to 
> those processors who have external state.
> we should consider offering a way to reset state that allows a processor to 
> call out to whatever external store it impacts



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to