Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/563#discussion_r69931774
--- Diff:
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
---
@@ -239,4 +300,188 @@ private void releaseFlowFile(FlowFile flowFile,
ProcessContext context, ProcessS
this.getLogger().info("Successfully received {} from Kafka with {}
messages in {} millis", new Object[] { flowFile, msgCount, executionDuration });
session.transfer(flowFile, REL_SUCCESS);
}
+
+ @Override
+ public StateMap getState() throws IOException {
+
+ if (!isReadyToAccessState()) {
+ return null;
+ }
+
+ final String groupId =
kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+ return submitConsumerGroupCommand("Fetch offsets", consumer -> {
+ final Map<String, String> partitionOffsets =
consumer.partitionsFor(topic).stream()
+ .map(p -> new TopicPartition(topic, p.partition()))
+ .map(tp -> new ImmutablePair<>(tp,
consumer.committed(tp)))
+ .filter(tpo -> tpo.right != null)
+ .collect(Collectors.toMap(tpo ->
+ "partition:" + tpo.left.partition(),
+ tpo -> String.valueOf(tpo.right.offset())));
+
+ logger.info("Retrieved offsets from Kafka, topic={},
groupId={}, partitionOffsets={}",
+ topic, groupId, partitionOffsets);
+
+ return new StandardStateMap(partitionOffsets,
System.currentTimeMillis());
+ }, null);
+ }
+
+ private boolean isReadyToAccessState() {
+ if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(brokers)
+ || kafkaProperties == null ||
StringUtils.isEmpty(kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG)))
{
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * <p>Clear offsets stored in Kafka, by committing -1 as offset of
each partitions of specified topic.</p>
+ *
+ * <p>Kafka allows commitSync if one of following conditions are met,
+ * see kafka.coordinator.GroupCoordinator.handleCommitOffsets for
detail:
+ * <ol>
+ * <li>The consumer is a member of the consumer group. In this case,
+ * even if there's other consumers connecting Kafka, offsets can be
updated.
+ * It's dangerous to clear offsets if there're active consumers.
+ * When consumer.subscribe() and poll() are called, the consumer will
be a member of the consumer group.</li>
+ *
+ * <li>There's no connected consumer within the group,
+ * and Kafka GroupCoordinator has marked the group as dead.
+ * It's safer but can take longer.</li>
+ * </ol>
+ *
+ * <p>The consumer group state transition is an async operation at
Kafka group coordinator.
+ * Although clear() can only be called when the processor is stopped,
+ * the consumer group may not be fully removed at Kafka, in that case,
CommitFailedException will be thrown.</p>
+ *
+ * <p>Following log msg can be found when GroupCoordinator has marked
the group as dead
+ * in kafka.out on a Kafka broker server, it can take more than 30
seconds:
+ * <blockquote>[GroupCoordinator]: Group [gid] generation 1 is dead
+ * and removed (kafka.coordinator.GroupCoordinator)</blockquote></p>
+ *
+ */
+ @Override
+ public void clear() throws IOException {
+
+ if (!isReadyToAccessState()) {
+ return;
+ }
+
+ final String groupId =
kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
+ final Boolean result = submitConsumerGroupCommand("Clear offsets",
consumer -> {
--- End diff --
Added synchronized block here as well.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---