[
https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366297#comment-15366297
]
ASF GitHub Bot commented on NIFI-2078:
--------------------------------------
Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/563#discussion_r69931774
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
---
@@ -239,4 +300,188 @@ private void releaseFlowFile(FlowFile flowFile,
ProcessContext context, ProcessS
this.getLogger().info("Successfully received {} from Kafka with {}
messages in {} millis", new Object[] { flowFile, msgCount, executionDuration });
session.transfer(flowFile, REL_SUCCESS);
}
+
+ @Override
+ public StateMap getState() throws IOException {
+
+ if (!isReadyToAccessState()) {
+ return null;
+ }
+
+ final String groupId =
kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+ return submitConsumerGroupCommand("Fetch offsets", consumer -> {
+ final Map<String, String> partitionOffsets =
consumer.partitionsFor(topic).stream()
+ .map(p -> new TopicPartition(topic, p.partition()))
+ .map(tp -> new ImmutablePair<>(tp,
consumer.committed(tp)))
+ .filter(tpo -> tpo.right != null)
+ .collect(Collectors.toMap(tpo ->
+ "partition:" + tpo.left.partition(),
+ tpo -> String.valueOf(tpo.right.offset())));
+
+ logger.info("Retrieved offsets from Kafka, topic={},
groupId={}, partitionOffsets={}",
+ topic, groupId, partitionOffsets);
+
+ return new StandardStateMap(partitionOffsets,
System.currentTimeMillis());
+ }, null);
+ }
+
+ private boolean isReadyToAccessState() {
+ if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(brokers)
+ || kafkaProperties == null ||
StringUtils.isEmpty(kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG)))
{
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * <p>Clear offsets stored in Kafka, by committing -1 as offset of
each partitions of specified topic.</p>
+ *
+ * <p>Kafka allows commitSync if one of following conditions are met,
+ * see kafka.coordinator.GroupCoordinator.handleCommitOffsets for
detail:
+ * <ol>
+ * <li>The consumer is a member of the consumer group. In this case,
+ * even if there's other consumers connecting Kafka, offsets can be
updated.
+ * It's dangerous to clear offsets if there're active consumers.
+ * When consumer.subscribe() and poll() are called, the consumer will
be a member of the consumer group.</li>
+ *
+ * <li>There's no connected consumer within the group,
+ * and Kafka GroupCoordinator has marked the group as dead.
+ * It's safer but can take longer.</li>
+ * </ol>
+ *
+ * <p>The consumer group state transition is an async operation at
Kafka group coordinator.
+ * Although clear() can only be called when the processor is stopped,
+ * the consumer group may not be fully removed at Kafka, in that case,
CommitFailedException will be thrown.</p>
+ *
+ * <p>Following log msg can be found when GroupCoordinator has marked
the group as dead
+ * in kafka.out on a Kafka broker server, it can take more than 30
seconds:
+ * <blockquote>[GroupCoordinator]: Group [gid] generation 1 is dead
+ * and removed (kafka.coordinator.GroupCoordinator)</blockquote></p>
+ *
+ */
+ @Override
+ public void clear() throws IOException {
+
+ if (!isReadyToAccessState()) {
+ return;
+ }
+
+ final String groupId =
kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+ final Boolean result = submitConsumerGroupCommand("Clear offsets",
consumer -> {
--- End diff --
Added synchronized block here as well.
> State management for processors whose states are managed externally
> -------------------------------------------------------------------
>
> Key: NIFI-2078
> URL: https://issues.apache.org/jira/browse/NIFI-2078
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Core Framework
> Reporter: Koji Kawamura
> Assignee: Koji Kawamura
> Fix For: 1.0.0
>
>
> Inherently by the nature of a given processor it may involve state managed by
> itself (using nifi state management), or can be managed by some external
> service it interacts with (kafka's offset), and theoretically some might have
> both going on. With the new state management, we're giving users a way to
> reset state managed by nifi for a given processor. But it doesnt apply to
> those processors who have external state.
> we should consider offering a way to reset state that allows a processor to
> call out to whatever external store it impacts
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)