[ 
https://issues.apache.org/jira/browse/KAFKA-3824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375977#comment-16375977
 ] 

ASF GitHub Bot commented on KAFKA-3824:
---------------------------------------

hachikuji closed pull request #1502: KAFKA-3824: Clarify the at least once 
delivery with auto commit enabled.
URL: https://github.com/apache/kafka/pull/1502
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 04b41ba1d13..1c249cc34b7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -983,17 +983,18 @@ public void assign(Collection<TopicPartition> partitions) 
{
 
         long now = time.milliseconds();
 
-        // execute delayed tasks (e.g. autocommits and heartbeats) prior to 
fetching records
+        // execute delayed tasks (e.g. autocommits and heartbeats) prior to 
fetching records.
+        // It is crucial for "at least once" delivery semantics to ensure that 
offset commits can
+        // only be sent prior to updating the position in Fetcher's 
fetchedRecords method.
         client.executeDelayedTasks(now);
 
-        // init any new fetches (won't resend pending fetches)
-        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = 
fetcher.fetchedRecords();
-
         // if data is available already, e.g. from a previous network client 
poll() call to commit,
-        // then just return it immediately
+        // then just return it immediately to avoid blocking in poll.
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = 
fetcher.fetchedRecords();
         if (!records.isEmpty())
             return records;
 
+        // send new fetches to any brokers which don't already have a request 
in flight.
         fetcher.sendFetches();
         client.poll(timeout, now);
         return fetcher.fetchedRecords();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index ec351153f05..77f5e1f385c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -31,12 +31,13 @@
  * or with {@link #assignFromSubscribed(Collection)} (automatic assignment 
from subscription).
  *
  * Once assigned, the partition is not considered "fetchable" until its 
initial position has
- * been set with {@link #seek(TopicPartition, long)}. Fetchable partitions 
track a fetch
- * position which is used to set the offset of the next fetch, and a consumed 
position
- * which is the last offset that has been returned to the user. You can 
suspend fetching
- * from a partition through {@link #pause(TopicPartition)} without affecting 
the fetched/consumed
- * offsets. The partition will remain unfetchable until the {@link 
#resume(TopicPartition)} is
- * used. You can also query the pause state independently with {@link 
#isPaused(TopicPartition)}.
+ * been set with {@link #seek(TopicPartition, long)}. Each fetchable partition 
tracks a position
+ * which is used to set the offset of the next fetch. The position used by the 
next fetch is
+ * one larger than the highest offset the consumer has seen in that partition. 
You can suspend
+ * fetching from a partition through {@link #pause(TopicPartition)} without 
affecting the
+ * fetched/consumed offsets. The partition will remain unfetchable until the
+ * {@link #resume(TopicPartition)} is used. You can also query the pause
+ * state independently with {@link #isPaused(TopicPartition)}.
  *
  * Note that pause state as well as fetch/consumed positions are not preserved 
when partition
  * assignment is changed whether directly by the user or through a group 
rebalance.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Docs indicate auto.commit breaks at least once delivery but that is incorrect
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-3824
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3824
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>    Affects Versions: 0.10.0.0
>            Reporter: Jay Kreps
>            Assignee: Jason Gustafson
>            Priority: Major
>              Labels: newbie
>             Fix For: 0.10.1.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The javadocs for the new consumer indicate that auto commit breaks at least 
> once delivery. This is no longer correct as of 0.10. 
> http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to