Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/563#discussion_r72829933
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
---
@@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile,
ProcessSession session, Map<Stri
session.transfer(flowFile, REL_SUCCESS);
}
}
+
+ @Override
+ public ExternalStateScope getExternalStateScope() {
+ return ExternalStateScope.CLUSTER;
+ }
+
+ @Override
+ public StateMap getExternalState() throws IOException {
+ // We don't have to synchronize with onTrigger here,
+ // since it merely retrieves state from Zk using different
channel, it doesn't affect consuming.
+ if (!isReadyToAccessState()) {
+ return null;
+ }
+ final Map<String, String> partitionOffsets =
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
+
+ return new StandardStateMap(partitionOffsets,
System.currentTimeMillis());
+ }
+
+ private boolean isReadyToAccessState() {
+ return !StringUtils.isEmpty(zookeeperConnectionString)
+ && !StringUtils.isEmpty(topic)
+ && !StringUtils.isEmpty(groupId);
+ }
+
+ @Override
+ public void clearExternalState() throws IOException {
+ if (!isReadyToAccessState()) {
+ return;
+ }
+ // Block onTrigger starts creating new consumer until clear offset
finishes.
+ synchronized (this.consumerStreamsReady) {
+ KafkaUtils.clearPartitionOffsets(zookeeperConnectionString,
topic, groupId);
+ }
+ }
+
+ /**
+ * GetKafka overrides this method in order to capture processor's
property values required when it retrieves
+ * its state managed externally at Kafka. Since view/clear state
operation can be executed before onTrigger() is called,
+ * we need to capture these values as it's modified. This method is
also called when NiFi restarts and loads configs,
+ * so users can access external states right after restart of NiFi.
+ * @param descriptor of the modified property
+ * @param oldValue non-null property value (previous)
+ * @param newValue the new property value or if null indicates the
property
+ */
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
+ if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) {
+ zookeeperConnectionString = newValue;
+ } else if (TOPIC.equals(descriptor)) {
+ topic = newValue;
+ } else if (GROUP_ID.equals(descriptor)) {
+ groupId = newValue;
--- End diff --
I ran into an error due to how GroupId is getting set. This only gets
called if someone modifies the groupid but it has a default value. So it will
be null if the user never modifies it and they will hit this error:
2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6]
o.apache.nifi.processors.kafka.GetKafka
GetKafka[id=3798f2c6-0156-1000-0000-0000138083b7] Processor Administratively
Yielded for 1 sec due to processing failure
2016-07-29 13:24:12,283 WARN [Timer-Driven Process Thread-6]
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
GetKafka[id=3798f2c6-0156-1000-0000-0000138083b7] due to uncaught Exception:
java.lang.IllegalStateException: java.util.concurrent.ExecutionException:
java.lang.NullPointerException
2016-07-29 13:24:12,284 WARN [Timer-Driven Process Thread-6]
o.a.n.c.t.ContinuallyRunProcessorTask
java.lang.IllegalStateException: java.util.concurrent.ExecutionException:
java.lang.NullPointerException
at
org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:367) ~[na:na]
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
~[nifi-api-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1060)
~[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:136)
[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:47)
[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:123)
[nifi-framework-core-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_74]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[na:1.8.0_74]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_74]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_74]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_74]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_74]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_74]
Caused by: java.util.concurrent.ExecutionException:
java.lang.NullPointerException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
[na:1.8.0_74]
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
[na:1.8.0_74]
at
org.apache.nifi.processors.kafka.GetKafka.onTrigger(GetKafka.java:360) ~[na:na]
... 12 common frames omitted
Caused by: java.lang.NullPointerException: null
at java.util.Hashtable.put(Hashtable.java:459) ~[na:1.8.0_74]
at java.util.Properties.setProperty(Properties.java:166) ~[na:1.8.0_74]
at
org.apache.nifi.processors.kafka.GetKafka.createConsumers(GetKafka.java:238)
~[na:na]
at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:355)
~[na:na]
at org.apache.nifi.processors.kafka.GetKafka$1.call(GetKafka.java:352)
~[na:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
[na:1.8.0_74]
... 3 common frames omitted
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---