[ https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399724#comment-15399724 ]
ASF GitHub Bot commented on NIFI-2078: -------------------------------------- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/563#discussion_r72829933 --- Diff: nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java --- @@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map<Stri session.transfer(flowFile, REL_SUCCESS); } } + + @Override + public ExternalStateScope getExternalStateScope() { + return ExternalStateScope.CLUSTER; + } + + @Override + public StateMap getExternalState() throws IOException { + // We don't have to synchronize with onTrigger here, + // since it merely retrieves state from Zk using different channel, it doesn't affect consuming. + if (!isReadyToAccessState()) { + return null; + } + final Map<String, String> partitionOffsets = KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId); + + return new StandardStateMap(partitionOffsets, System.currentTimeMillis()); + } + + private boolean isReadyToAccessState() { + return !StringUtils.isEmpty(zookeeperConnectionString) + && !StringUtils.isEmpty(topic) + && !StringUtils.isEmpty(groupId); + } + + @Override + public void clearExternalState() throws IOException { + if (!isReadyToAccessState()) { + return; + } + // Block onTrigger starts creating new consumer until clear offset finishes. + synchronized (this.consumerStreamsReady) { + KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, topic, groupId); + } + } + + /** + * GetKafka overrides this method in order to capture processor's property values required when it retrieves + * its state managed externally at Kafka. Since view/clear state operation can be executed before onTrigger() is called, + * we need to capture these values as it's modified. This method is also called when NiFi restarts and loads configs, + * so users can access external states right after restart of NiFi. + * @param descriptor of the modified property + * @param oldValue non-null property value (previous) + * @param newValue the new property value or if null indicates the property + */ + @Override + public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, String newValue) { + if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) { + zookeeperConnectionString = newValue; + } else if (TOPIC.equals(descriptor)) { + topic = newValue; + } else if (GROUP_ID.equals(descriptor)) { + groupId = newValue; --- End diff -- I ran into an error due to how GroupId is getting set. This only gets called if someone modifies the groupid but it has a default value. So it will be null if the user never modifies it and they will hit this error: 2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6] o.apache.nifi.processors.kafka.GetKafka GetKafka[id=3798f2c6-0156-1000-0000-0000138083b7] Processor Administratively Yielded for 1 sec due to processing failure 2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6] o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding GetKafka[id=3798f2c6-0156-1000-0000-0000138083b7] due to uncaught Exception: java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.NullPointerException 2016-07-29 13:24:12,284 WARN [Timer-Driven Process Thread-6] o.a.n.c.t.ContinuallyRunProcessorTask java.lang.IllegalStateException: java.util.concurrent.ExecutionException: java.lang.NullPointerException at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:367) ~[na:na] at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) ~[nifi-api-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT] at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1060) ~[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136) [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT] at org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47) [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT] at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123) [nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_74] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [na:1.8.0_74] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [na:1.8.0_74] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [na:1.8.0_74] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_74] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_74] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_74] Caused by: java.util.concurrent.ExecutionException: java.lang.NullPointerException at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_74] at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_74] at org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:360) ~[na:na] ... 12 common frames omitted Caused by: java.lang.NullPointerException: null at java.util.Hashtable.put(Hashtable.java:459) ~[na:1.8.0_74] at java.util.Properties.setProperty(Properties.java:166) ~[na:1.8.0_74] at org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:238) ~[na:na] at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:355) ~[na:na] at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:352) ~[na:na] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_74] ... 3 common frames omitted > State management for processors whose states are managed externally > ------------------------------------------------------------------- > > Key: NIFI-2078 > URL: https://issues.apache.org/jira/browse/NIFI-2078 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework > Reporter: Koji Kawamura > Assignee: Koji Kawamura > Fix For: 1.0.0 > > > Inherently by the nature of a given processor it may involve state managed by > itself (using nifi state management), or can be managed by some external > service it interacts with (kafka's offset), and theoretically some might have > both going on. With the new state management, we're giving users a way to > reset state managed by nifi for a given processor. But it doesnt apply to > those processors who have external state. > we should consider offering a way to reset state that allows a processor to > call out to whatever external store it impacts -- This message was sent by Atlassian JIRA (v6.3.4#6332)