Github user JPercivall commented on a diff in the pull request:
https://github.com/apache/nifi/pull/563#discussion_r72834762
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
---
@@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile,
ProcessSession session, Map<Stri
session.transfer(flowFile, REL_SUCCESS);
}
}
+
+ @Override
+ public ExternalStateScope getExternalStateScope() {
+ return ExternalStateScope.CLUSTER;
+ }
+
+ @Override
+ public StateMap getExternalState() throws IOException {
+ // We don't have to synchronize with onTrigger here,
+ // since it merely retrieves state from Zk using different
channel, it doesn't affect consuming.
+ if (!isReadyToAccessState()) {
+ return null;
+ }
+ final Map<String, String> partitionOffsets =
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
+
+ return new StandardStateMap(partitionOffsets,
System.currentTimeMillis());
+ }
+
+ private boolean isReadyToAccessState() {
+ return !StringUtils.isEmpty(zookeeperConnectionString)
+ && !StringUtils.isEmpty(topic)
+ && !StringUtils.isEmpty(groupId);
+ }
+
+ @Override
+ public void clearExternalState() throws IOException {
+ if (!isReadyToAccessState()) {
+ return;
+ }
+ // Block onTrigger starts creating new consumer until clear offset
finishes.
+ synchronized (this.consumerStreamsReady) {
+ KafkaUtils.clearPartitionOffsets(zookeeperConnectionString,
topic, groupId);
+ }
+ }
+
+ /**
+ * GetKafka overrides this method in order to capture processor's
property values required when it retrieves
+ * its state managed externally at Kafka. Since view/clear state
operation can be executed before onTrigger() is called,
+ * we need to capture these values as it's modified. This method is
also called when NiFi restarts and loads configs,
+ * so users can access external states right after restart of NiFi.
+ * @param descriptor of the modified property
+ * @param oldValue non-null property value (previous)
+ * @param newValue the new property value or if null indicates the
property
+ */
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
+ if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) {
+ zookeeperConnectionString = newValue;
+ } else if (TOPIC.equals(descriptor)) {
+ topic = newValue;
+ } else if (GROUP_ID.equals(descriptor)) {
+ groupId = newValue;
--- End diff --
Probably the easiest way to address this is to set groupId when the default
gets created for the property.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---