[
https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15401018#comment-15401018
]
ASF GitHub Bot commented on NIFI-2078:
--------------------------------------
Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/563#discussion_r72899732
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
---
@@ -481,4 +493,59 @@ private void releaseFlowFile(FlowFile flowFile,
ProcessSession session, Map<Stri
session.transfer(flowFile, REL_SUCCESS);
}
}
+
+ @Override
+ public ExternalStateScope getExternalStateScope() {
+ return ExternalStateScope.CLUSTER;
+ }
+
+ @Override
+ public StateMap getExternalState() throws IOException {
+ // We don't have to synchronize with onTrigger here,
+ // since it merely retrieves state from Zk using different
channel, it doesn't affect consuming.
+ if (!isReadyToAccessState()) {
+ return null;
+ }
+ final Map<String, String> partitionOffsets =
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
+
+ return new StandardStateMap(partitionOffsets,
System.currentTimeMillis());
+ }
+
+ private boolean isReadyToAccessState() {
+ return !StringUtils.isEmpty(zookeeperConnectionString)
+ && !StringUtils.isEmpty(topic)
+ && !StringUtils.isEmpty(groupId);
+ }
+
+ @Override
+ public void clearExternalState() throws IOException {
+ if (!isReadyToAccessState()) {
+ return;
+ }
+ // Block onTrigger starts creating new consumer until clear offset
finishes.
+ synchronized (this.consumerStreamsReady) {
+ KafkaUtils.clearPartitionOffsets(zookeeperConnectionString,
topic, groupId);
+ }
+ }
+
+ /**
+ * GetKafka overrides this method in order to capture processor's
property values required when it retrieves
+ * its state managed externally at Kafka. Since view/clear state
operation can be executed before onTrigger() is called,
+ * we need to capture these values as it's modified. This method is
also called when NiFi restarts and loads configs,
+ * so users can access external states right after restart of NiFi.
+ * @param descriptor of the modified property
+ * @param oldValue non-null property value (previous)
+ * @param newValue the new property value or if null indicates the
property
+ */
+ @Override
+ public void onPropertyModified(PropertyDescriptor descriptor, String
oldValue, String newValue) {
+ if (ZOOKEEPER_CONNECTION_STRING.equals(descriptor)) {
+ zookeeperConnectionString = newValue;
+ } else if (TOPIC.equals(descriptor)) {
+ topic = newValue;
+ } else if (GROUP_ID.equals(descriptor)) {
+ groupId = newValue;
--- End diff --
Thanks for pointing this out. Addressed.
Fixed bootstrap server address for ConsumeKafka, too.
> State management for processors whose states are managed externally
> -------------------------------------------------------------------
>
> Key: NIFI-2078
> URL: https://issues.apache.org/jira/browse/NIFI-2078
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Koji Kawamura
> Assignee: Koji Kawamura
> Fix For: 1.0.0
>
>
> Inherently by the nature of a given processor it may involve state managed by
> itself (using nifi state management), or can be managed by some external
> service it interacts with (kafka's offset), and theoretically some might have
> both going on. With the new state management, we're giving users a way to
> reset state managed by nifi for a given processor. But it doesnt apply to
> those processors who have external state.
> we should consider offering a way to reset state that allows a processor to
> call out to whatever external store it impacts
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)