[
https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15399724#comment-15399724
]
ASF GitHub Bot commented on NIFI-2078:
--------------------------------------
Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/563#discussion_r72829933
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
---
@@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile,
ProcessSession session, Map<Stri
session.transfer(flowFile, REL_SUCCESS);
}
}
+
+ @Override
+ public ExternalStateScope getExternalStateScope() {
+ return ExternalStateScope.CLUSTER;
+ }
+
+ @Override
+ public StateMap getExternalState() throws IOException {
+ // We don't have to synchronize with onTrigger here,
+ // since it merely retrieves state from Zk using different
channel, it doesn't affect consuming.
+ if (!isReadyToAccessState()) {
+ return null;
+ }
+ final Map<String, String> partitionOffsets =
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
+
+ return new StandardStateMap(partitionOffsets,
System.currentTimeMillis());
+ }
+
+ private boolean isReadyToAccessState() {
+ return !StringUtils.isEmpty(zookeeperConnectionString)
+ && !StringUtils.isEmpty(topic)
+ && !StringUtils.isEmpty(groupId);
+ }
+
+ @Override
+ public void clearExternalState() throws IOException {
+ if (!isReadyToAccessState()) {
+ return;
+ }
+ // Block onTrigger starts creating new consumer until clear offset
finishes.
+ synchronized (this.consumerStreamsReady) {
+ KafkaUtils.clearPartitionOffsets(zookeeperConnectionString,
topic, groupId);
+ }
+ }
+
+ /**
+ * GetKafka overrides this method in order to capture processor's
property values required when it retrieves
+ * its state managed externally at Kafka. Since view/clear state
operation can be executed before onTrigger() is called,
+ * we need to capture these values as it's modified. This method is
also called when NiFi restarts and loads configs,
+ * so users can access external states right after restart of NiFi.
+ * @param descriptor of the modified property
+ * @param oldValue non-null property value (previous)
+ * @param newValue the new property value or if null indicates the
property
+ */
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
+ if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) {
+ zookeeperConnectionString = newValue;
+ } else if (TOPIC.equals(descriptor)) {
+ topic = newValue;
+ } else if (GROUP_ID.equals(descriptor)) {
+ groupId = newValue;
--- End diff --
I ran into an error due to how GroupId is getting set. This only gets
called if someone modifies the groupid but it has a default value. So it will
be null if the user never modifies it and they will hit this error:
2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6]
o.apache.nifi.processors.kafka.GetKafka
GetKafka[id=3798f2c6-0156-1000-0000-0000138083b7] Processor Administratively
Yielded for 1 sec due to processing failure
2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6]
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
GetKafka[id=3798f2c6-0156-1000-0000-0000138083b7] due to uncaught Exception:
java.lang.IllegalStateException: java.util.concurrent.ExecutionException:
java.lang.NullPointerException
2016-07-29 13:24:12,284 WARN [Timer-Driven Process Thread-6]
o.a.n.c.t.ContinuallyRunProcessorTask
java.lang.IllegalStateException: java.util.concurrent.ExecutionException:
java.lang.NullPointerException
at
org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:367) ~[na:na]
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
~[nifi-api-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1060)
~[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_74]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[na:1.8.0_74]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_74]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_74]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_74]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_74]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_74]
Caused by: java.util.concurrent.ExecutionException:
java.lang.NullPointerException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
[na:1.8.0_74]
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
[na:1.8.0_74]
at
org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:360) ~[na:na]
... 12 common frames omitted
Caused by: java.lang.NullPointerException: null
at java.util.Hashtable.put(Hashtable.java:459) ~[na:1.8.0_74]
at java.util.Properties.setProperty(Properties.java:166) ~[na:1.8.0_74]
at
org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:238)
~[na:na]
at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:355)
~[na:na]
at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:352)
~[na:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[na:1.8.0_74]
... 3 common frames omitted
> State management for processors whose states are managed externally
> -------------------------------------------------------------------
>
> Key: NIFI-2078
> URL: https://issues.apache.org/jira/browse/NIFI-2078
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Koji Kawamura
> Assignee: Koji Kawamura
> Fix For: 1.0.0
>
>
> Inherently by the nature of a given processor it may involve state managed by
> itself (using nifi state management), or can be managed by some external
> service it interacts with (kafka's offset), and theoretically some might have
> both going on. With the new state management, we're giving users a way to
> reset state managed by nifi for a given processor. But it doesnt apply to
> those processors who have external state.
> we should consider offering a way to reset state that allows a processor to
> call out to whatever external store it impacts
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)