Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/563#discussion_r72829933
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
 ---
    @@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile, 
ProcessSession session, Map<Stri
                 session.transfer(flowFile, REL_SUCCESS);
             }
         }
    +
    +    @Override
    +    public ExternalStateScope getExternalStateScope() {
    +        return ExternalStateScope.CLUSTER;
    +    }
    +
    +    @Override
    +    public StateMap getExternalState() throws IOException {
    +        // We don't have to synchronize with onTrigger here,
    +        // since it merely retrieves state from Zk using different 
channel, it doesn't affect consuming.
    +        if (!isReadyToAccessState()) {
    +            return null;
    +        }
    +        final Map<String, String> partitionOffsets = 
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
    +
    +        return new StandardStateMap(partitionOffsets, 
System.currentTimeMillis());
    +    }
    +
    +    private boolean isReadyToAccessState() {
    +        return !StringUtils.isEmpty(zookeeperConnectionString)
    +                && !StringUtils.isEmpty(topic)
    +                && !StringUtils.isEmpty(groupId);
    +    }
    +
    +    @Override
    +    public void clearExternalState() throws IOException {
    +        if (!isReadyToAccessState()) {
    +            return;
    +        }
    +        // Block onTrigger starts creating new consumer until clear offset 
finishes.
    +        synchronized (this.consumerStreamsReady) {
    +            KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, 
topic, groupId);
    +        }
    +    }
    +
    +    /**
    +     * GetKafka overrides this method in order to capture processor's 
property values required when it retrieves
    +     * its state managed externally at Kafka. Since view/clear state 
operation can be executed before onTrigger() is called,
    +     * we need to capture these values as it's modified. This method is 
also called when NiFi restarts and loads configs,
    +     * so users can access external states right after restart of NiFi.
    +     * @param descriptor of the modified property
    +     * @param oldValue non-null property value (previous)
    +     * @param newValue the new property value or if null indicates the 
property
    +     */
    +    @Override
    +    public void onPropertyModified(PropertyDescriptor descriptor, String 
oldValue, String newValue) {
    +        if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) {
    +            zookeeperConnectionString = newValue;
    +        } else if (TOPIC.equals(descriptor)) {
    +            topic = newValue;
    +        } else if (GROUP_ID.equals(descriptor)) {
    +            groupId = newValue;
    --- End diff --
    
    I ran into an error due to how GroupId is getting set. This only gets 
called if someone modifies the groupid but it has a default value. So it will 
be null if the user never modifies it and they will hit this error:
    
    2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6] 
o.apache.nifi.processors.kafka.GetKafka 
GetKafka[id=3798f2c6-0156-1000-0000-0000138083b7] Processor Administratively 
Yielded for 1 sec due to processing failure
    2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6] 
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding 
GetKafka[id=3798f2c6-0156-1000-0000-0000138083b7] due to uncaught Exception: 
java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
    2016-07-29 13:24:12,284 WARN [Timer-Driven Process Thread-6] 
o.a.n.c.t.ContinuallyRunProcessorTask 
    java.lang.IllegalStateException: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
        at 
org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:367) ~[na:na]
        at 
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
 ~[nifi-api-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
        at 
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1060)
 ~[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
        at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
 [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
        at 
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
 [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
        at 
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
 [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[na:1.8.0_74]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) 
[na:1.8.0_74]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 [na:1.8.0_74]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 [na:1.8.0_74]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
[na:1.8.0_74]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
[na:1.8.0_74]
        at java.lang.Thread.run(Thread.java:745) [na:1.8.0_74]
    Caused by: java.util.concurrent.ExecutionException: 
java.lang.NullPointerException
        at java.util.concurrent.FutureTask.report(FutureTask.java:122) 
[na:1.8.0_74]
        at java.util.concurrent.FutureTask.get(FutureTask.java:206) 
[na:1.8.0_74]
        at 
org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:360) ~[na:na]
        ... 12 common frames omitted
    Caused by: java.lang.NullPointerException: null
        at java.util.Hashtable.put(Hashtable.java:459) ~[na:1.8.0_74]
        at java.util.Properties.setProperty(Properties.java:166) ~[na:1.8.0_74]
        at 
org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:238) 
~[na:na]
        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:355) 
~[na:na]
        at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:352) 
~[na:na]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[na:1.8.0_74]
        ... 3 common frames omitted


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to