[
https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366292#comment-15366292
]
ASF GitHub Bot commented on NIFI-2078:
--------------------------------------
Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/563#discussion_r69931510
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
---
@@ -481,4 +496,51 @@ private void releaseFlowFile(FlowFile flowFile,
ProcessSession session, Map<Stri
session.transfer(flowFile, REL_SUCCESS);
}
}
+
+ @Override
+ public StateMap getState() throws IOException {
+ if (!isReadyToAccessState()) {
+ return null;
+ }
+ final Map<String, String> partitionOffsets =
KafkaUtils.retrievePartitionOffsets(zookeeperConnectionString, topic, groupId);
+
+ return new StandardStateMap(partitionOffsets,
System.currentTimeMillis());
+ }
+
+ private boolean isReadyToAccessState() {
+ if(StringUtils.isEmpty(zookeeperConnectionString)
+ || StringUtils.isEmpty(topic)
+ || StringUtils.isEmpty(groupId)) {
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void clear() throws IOException {
+ if (!isReadyToAccessState()) {
+ return;
+ }
+ KafkaUtils.clearPartitionOffsets(zookeeperConnectionString, topic,
groupId);
--- End diff --
Thanks, good point, I've added synchronized block in clearExternalState().
> State management for processors whose states are managed externally
> -------------------------------------------------------------------
>
> Key: NIFI-2078
> URL: https://issues.apache.org/jira/browse/NIFI-2078
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Koji Kawamura
> Assignee: Koji Kawamura
> Fix For: 1.0.0
>
>
> Inherently by the nature of a given processor it may involve state managed by
> itself (using nifi state management), or can be managed by some external
> service it interacts with (kafka's offset), and theoretically some might have
> both going on. With the new state management, we're giving users a way to
> reset state managed by nifi for a given processor. But it doesnt apply to
> those processors who have external state.
> we should consider offering a way to reset state that allows a processor to
> call out to whatever external store it impacts
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)