[ 
https://issues.apache.org/jira/browse/NIFI-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15366297#comment-15366297
 ] 

ASF GitHub Bot commented on NIFI-2078:
--------------------------------------

Github user ijokarumawak commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/563#discussion_r69931774
  
    --- Diff: 
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
 ---
    @@ -239,4 +300,188 @@ private void releaseFlowFile(FlowFile flowFile, 
ProcessContext context, ProcessS
             this.getLogger().info("Successfully received {} from Kafka with {} 
messages in {} millis", new Object[] { flowFile, msgCount, executionDuration });
             session.transfer(flowFile, REL_SUCCESS);
         }
    +
    +    @Override
    +    public StateMap getState() throws IOException {
    +
    +        if (!isReadyToAccessState()) {
    +            return null;
    +        }
    +
    +        final String groupId = 
kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
    +        return submitConsumerGroupCommand("Fetch offsets", consumer -> {
    +            final Map<String, String> partitionOffsets = 
consumer.partitionsFor(topic).stream()
    +                    .map(p -> new TopicPartition(topic, p.partition()))
    +                    .map(tp -> new ImmutablePair<>(tp, 
consumer.committed(tp)))
    +                    .filter(tpo -> tpo.right != null)
    +                    .collect(Collectors.toMap(tpo ->
    +                                    "partition:" + tpo.left.partition(),
    +                            tpo -> String.valueOf(tpo.right.offset())));
    +
    +            logger.info("Retrieved offsets from Kafka, topic={}, 
groupId={}, partitionOffsets={}",
    +                    topic, groupId, partitionOffsets);
    +
    +            return new StandardStateMap(partitionOffsets, 
System.currentTimeMillis());
    +        }, null);
    +    }
    +
    +    private boolean isReadyToAccessState() {
    +        if (StringUtils.isEmpty(topic) || StringUtils.isEmpty(brokers)
    +                || kafkaProperties == null || 
StringUtils.isEmpty(kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG)))
 {
    +            return false;
    +        }
    +        return true;
    +    }
    +
    +    /**
    +     * <p>Clear offsets stored in Kafka, by committing -1 as offset of 
each partitions of specified topic.</p>
    +     *
    +     * <p>Kafka allows commitSync if one of following conditions are met,
    +     * see kafka.coordinator.GroupCoordinator.handleCommitOffsets for 
detail:
    +     * <ol>
    +     * <li>The consumer is a member of the consumer group. In this case,
    +     * even if there's other consumers connecting Kafka, offsets can be 
updated.
    +     * It's dangerous to clear offsets if there're active consumers.
    +     * When consumer.subscribe() and poll() are called, the consumer will 
be a member of the consumer group.</li>
    +     *
    +     * <li>There's no connected consumer within the group,
    +     * and Kafka GroupCoordinator has marked the group as dead.
    +     * It's safer but can take longer.</li>
    +     * </ol>
    +     *
    +     * <p>The consumer group state transition is an async operation at 
Kafka group coordinator.
    +     * Although clear() can only be called when the processor is stopped,
    +     * the consumer group may not be fully removed at Kafka, in that case, 
CommitFailedException will be thrown.</p>
    +     *
    +     * <p>Following log msg can be found when GroupCoordinator has marked 
the group as dead
    +     * in kafka.out on a Kafka broker server, it can take more than 30 
seconds:
    +     * <blockquote>[GroupCoordinator]: Group [gid] generation 1 is dead
    +     * and removed (kafka.coordinator.GroupCoordinator)</blockquote></p>
    +     *
    +     */
    +    @Override
    +    public void clear() throws IOException {
    +
    +        if (!isReadyToAccessState()) {
    +            return;
    +        }
    +
    +        final String groupId = 
kafkaProperties.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
    +        final Boolean result = submitConsumerGroupCommand("Clear offsets", 
consumer -> {
    --- End diff --
    
    Added synchronized block here as well.


> State management for processors whose states are managed externally
> -------------------------------------------------------------------
>
>                 Key: NIFI-2078
>                 URL: https://issues.apache.org/jira/browse/NIFI-2078
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Core Framework
>            Reporter: Koji Kawamura
>            Assignee: Koji Kawamura
>             Fix For: 1.0.0
>
>
> Inherently by the nature of a given processor it may involve state managed by 
> itself (using nifi state management), or can be managed by some external 
> service it interacts with (kafka's offset), and theoretically some might have 
> both going on. With the new state management, we're giving users a way to 
> reset state managed by nifi for a given processor. But it doesnt apply to 
> those processors who have external state.
> we should consider offering a way to reset state that allows a processor to 
> call out to whatever external store it impacts



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to