[ 
https://issues.apache.org/jira/browse/KAFKA-6189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16249419#comment-16249419
 ] 

Xinyang Gao edited comment on KAFKA-6189 at 11/13/17 11:57 AM:
---------------------------------------------------------------

I had seen similar behaviors which also led to message loss, setup is as 
follows:

topic has 3 partitions and replication factor 3, named gao31
min.insync.replicas=2
consumer has default "auto.offset.reset=latest"
consumer manually commitSync offsets after handling messages with 
consumer.commitSync()
unclean leader election = false
kafka cluster has 3 brokers, kafka-foo-0, kafka-foo-1 and kafka-foo-2
consumer group ID is uklonvd826214
session.timeout.ms and max.poll.interval.ms are using default values

Initially kafka-foo-1 is selected as group coordinator

To reproduce: 
1. kill kafka-foo-1 
2. See following logs which implies that consumer disconnect to kafka-foo-1 and 
try to discover a new group coordinator 


{code:java}
2017-11-10 11:27:45,097 DEBUG org.apache.kafka.clients.NetworkClient            
            - Node 2147483646 disconnected. [kafka-consumer]
2017-11-10 11:27:45,097 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the 
coordinator kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 
2147483646 rack: null) dead for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient            
            - Removing node 
kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null) from 
least loaded node selection: is-blacked-out: true, in-flight-requests: 0 
[kafka-consumer]
2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient            
            - Removing node 
kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null) from 
least loaded node selection: is-blacked-out: false, in-flight-requests: 1 
[kafka-consumer]
2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient            
            - Found least loaded node 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:27:45,098 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
GroupCoordinator request for group uklonvd826214 to broker 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Removing node 
kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null) from 
least loaded node selection: is-blacked-out: true, in-flight-requests: 0 
[kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Removing node 
kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null) from 
least loaded node selection: is-blacked-out: false, in-flight-requests: 1 
[kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Found least loaded node 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:27:45,098 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
GroupCoordinator request for group uklonvd826214 to broker 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Removing node 
kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null) from 
least loaded node selection: is-blacked-out: true, in-flight-requests: 0 
[kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Removing node 
kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null) from 
least loaded node selection: is-blacked-out: false, in-flight-requests: 1 
[kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Found least loaded node 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:27:45,099 DEBUG org.apache.kafka.clients.NetworkClient            
            - Sending metadata request (type=MetadataRequest, topics=gao31) to 
node 0 [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending METADATA {topics=[gao31],allow_auto_topic_creation=true} 
to node 0. [kafka-consumer]
2017-11-10 11:27:45,100 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending FIND_COORDINATOR 
{coordinator_key=uklonvd826214,coordinator_type=0} to node 0. [kafka-consumer]
2017-11-10 11:27:45,276 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 2, for key 1, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=2,error_code=0,high_watermark=1631,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1630,
 timestamp=1510313264997, key=1 bytes, value=102 
bytes))]},{partition_header={partition=0,error_code=0,high_watermark=1223,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[]}]}]}
 [kafka-consumer]
2017-11-10 11:27:45,276 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch 
READ_UNCOMMITTED at offset 1630 for partition gao31-2 returned fetch data 
(error=NONE, highWaterMark=1631, lastStableOffset = -1, logStartOffset = 0, 
abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer]
2017-11-10 11:27:45,276 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch 
READ_UNCOMMITTED at offset 1223 for partition gao31-0 returned fetch data 
(error=NONE, highWaterMark=1223, lastStableOffset = -1, logStartOffset = 0, 
abortedTransactions = null, recordsSizeInBytes=0) [kafka-consumer]
2017-11-10 11:27:45,341 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 0, for key 1, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=1224,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1223,
 timestamp=1510313265277, key=1 bytes, value=102 bytes))]}]}]} [kafka-consumer]
2017-11-10 11:27:45,341 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch 
READ_UNCOMMITTED at offset 1223 for partition gao31-1 returned fetch data 
(error=NONE, highWaterMark=1224, lastStableOffset = -1, logStartOffset = 0, 
abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer]
2017-11-10 11:27:45,342 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 0, for key 3, received 
{throttle_time_ms=0,brokers=[{node_id=2,host=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com,port=9092,rack=null},{node_id=1,host=kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com,port=9092,rack=null},{node_id=0,host=kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com,port=9092,rack=null}],cluster_id=Y8zodxM7TNi-19dFcgErpw,controller_id=0,topic_metadata=[{topic_error_code=0,topic=gao31,is_internal=false,partition_metadata=[{partition_error_code=0,partition_id=2,leader=2,replicas=[1,2,0],isr=[0,2]},{partition_error_code=0,partition_id=1,leader=0,replicas=[0,1,2],isr=[0,2]},{partition_error_code=0,partition_id=0,leader=2,replicas=[2,0,1],isr=[2,0]}]}]}
 [kafka-consumer]
2017-11-10 11:27:45,342 DEBUG org.apache.kafka.clients.Metadata                 
            - Updated cluster metadata version 4 to Cluster(id = 
Y8zodxM7TNi-19dFcgErpw, nodes = 
[kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null), 
kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null), 
kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null)], 
partitions = [Partition(topic = gao31, partition = 0, leader = 2, replicas = 
[2,0,1], isr = [2,0]), Partition(topic = gao31, partition = 1, leader = 0, 
replicas = [0,1,2], isr = [0,2]), Partition(topic = gao31, partition = 2, 
leader = 2, replicas = [1,2,0], isr = [0,2])]) [kafka-consumer]
2017-11-10 11:27:45,343 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 0, for key 10, received 
{throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=2,host=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com,port=9092}}
 [kafka-consumer]
2017-11-10 11:27:45,343 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received 
GroupCoordinator response ClientResponse(receivedTimeMs=1510313265343, 
latencyMs=245, disconnected=false, 
requestHeader={api_key=10,api_version=1,correlation_id=7610,client_id=consumer-2},
 responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=NONE, node=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 
rack: null))) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,346 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 
2147483645 rack: null) for group uklonvd826214. [kafka-consumer]
2017-11-10 11:27:45,346 DEBUG org.apache.kafka.clients.NetworkClient            
            - Initiating connection to node 2147483645 at 
kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092. [kafka-consumer]
2017-11-10 11:27:45,350 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Preparing to 
read 188 bytes of data for partition gao31-2 with offset 1630 [kafka-consumer]
2017-11-10 11:27:45,350 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Updating high 
watermark for partition gao31-2 to 1631 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Returning 
fetched records at offset 1630 for assigned partition gao31-2 and update 
position to 1631 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Preparing to 
read 0 bytes of data for partition gao31-0 with offset 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Updating high 
watermark for partition gao31-0 to 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Returning 
fetched records at offset 1223 for assigned partition gao31-0 and update 
position to 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Preparing to 
read 188 bytes of data for partition gao31-1 with offset 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Updating high 
watermark for partition gao31-1 to 1224 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Returning 
fetched records at offset 1223 for assigned partition gao31-1 and update 
position to 1224 [kafka-consumer]
{code}



3. Then it tries to connect to kafka-foo-0 or kafka-foo-2 in a round-robin way:



{code:java}
2017-11-10 11:27:45,470 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 2147483645, for key 8, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=16},{partition=1,error_code=16},{partition=2,error_code=16}]}]}
 [kafka-consumer]
2017-11-10 11:27:45,470 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Offset 
commit for group uklonvd826214 failed: This is not the correct coordinator. 
[kafka-consumer]
2017-11-10 11:27:45,470 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the 
coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 
2147483645 rack: null) dead for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,573 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 0, for key 1, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=1225,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1224,
 timestamp=1510313265499, key=1 bytes, value=102 bytes))]}]}]} [kafka-consumer]
2017-11-10 11:27:45,573 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch 
READ_UNCOMMITTED at offset 1224 for partition gao31-1 returned fetch data 
(error=NONE, highWaterMark=1225, lastStableOffset = -1, logStartOffset = 0, 
abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer]
2017-11-10 11:27:45,575 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 2, for key 10, received 
{throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=2,host=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com,port=9092}}
 [kafka-consumer]
2017-11-10 11:27:45,575 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received 
GroupCoordinator response ClientResponse(receivedTimeMs=1510313265575, 
latencyMs=4, disconnected=false, 
requestHeader={api_key=10,api_version=1,correlation_id=7616,client_id=consumer-2},
 responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=NONE, node=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 
rack: null))) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,575 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 
2147483645 rack: null) for group uklonvd826214. [kafka-consumer]
2017-11-10 11:27:45,576 TRACE 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
OffsetCommit request with {gao31-1=OffsetAndMetadata{offset=1224, metadata='no 
metadata'}, gao31-2=OffsetAndMetadata{offset=1631, metadata='no metadata'}, 
gao31-0=OffsetAndMetadata{offset=1223, metadata='no metadata'}} to coordinator 
kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483645 rack: 
null) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,576 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending OFFSET_COMMIT 
{group_id=uklonvd826214,group_generation_id=7,member_id=consumer-2-82264aac-077f-469a-9783-2d8949bd61c3,retention_time=-1,topics=[{topic=gao31,partitions=[{partition=0,offset=1223,metadata=no
 metadata},{partition=1,offset=1224,metadata=no 
metadata},{partition=2,offset=1631,metadata=no metadata}]}]} to node 
2147483645. [kafka-consumer]
2017-11-10 11:27:45,581 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 2147483645, for key 8, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=25},{partition=1,error_code=25},{partition=2,error_code=25}]}]}
 [kafka-consumer]
2017-11-10 11:27:45,581 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Offset 
commit for group uklonvd826214 failed: The coordinator is not aware of this 
member. [kafka-consumer]
2017-11-10 11:27:45,581 DEBUG com.db.gm.cto.core.tests.ContinuousConsumer       
            - unable to commit offsets [kafka-consumer]
2017-11-10 11:27:45,582 INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Revoking 
previously assigned partitions [gao31-1, gao31-2, gao31-0] for group 
uklonvd826214 [kafka-consumer]
......
{code}


this lasts for a about 3 minutes
4. Finally it discovered kafka-foo-0 and use it as group coordinator, however, 
the logs in red shows that the new group coordinator is not able to know what 
is the last committed offset for this consumer group, thus resetting the offset 
to "latest"




{code:java}
2017-11-10 11:30:05,817 TRACE org.apache.kafka.clients.NetworkClient            
            - Found least loaded node 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
connected with no in-flight requests [kafka-consumer]
2017-11-10 11:30:05,817 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
GroupCoordinator request for group uklonvd826214 to broker 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:30:05,817 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending FIND_COORDINATOR 
{coordinator_key=uklonvd826214,coordinator_type=0} to node 0. [kafka-consumer]
2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 0, for key 10, received 
{throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=0,host=kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com,port=9092}}
 [kafka-consumer]
2017-11-10 11:30:05,820 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received 
GroupCoordinator response ClientResponse(receivedTimeMs=1510313405820, 
latencyMs=3, disconnected=false, 
requestHeader={api_key=10,api_version=1,correlation_id=10942,client_id=consumer-2},
 responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=NONE, node=kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 
rack: null))) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:05,820 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
coordinator kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 
2147483647 rack: null) for group uklonvd826214. [kafka-consumer]
2017-11-10 11:30:05,820 TRACE 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Disabling 
heartbeat thread for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:05,820 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - (Re-)joining 
group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:05,820 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
JoinGroup ((type: JoinGroupRequest, groupId=uklonvd826214, 
sessionTimeout=10000, rebalanceTimeout=300000, 
memberId=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5, 
protocolType=consumer, 
groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@161cb9a1))
 to coordinator kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 
2147483647 rack: null) [kafka-consumer]
2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending JOIN_GROUP 
{group_id=uklonvd826214,session_timeout=10000,rebalance_timeout=300000,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
 lim=17 cap=17]}]} to node 2147483647. [kafka-consumer]
2017-11-10 11:30:05,823 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 2147483647, for key 11, received 
{throttle_time_ms=0,error_code=0,generation_id=1113,group_protocol=range,leader_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,members=[{member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_metadata=java.nio.HeapByteBuffer[pos=0
 lim=17 cap=17]}]} [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received 
successful JoinGroup response for group uklonvd826214: 
org.apache.kafka.common.requests.JoinGroupResponse@6f13c4b9 [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Performing 
assignment for group uklonvd826214 using strategy range with subscriptions 
{consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5=Subscription(topics=[gao31])} 
[kafka-consumer]
2017-11-10 11:30:05,823 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Finished 
assignment for group uklonvd826214: 
{consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5=Assignment(partitions=[gao31-0,
 gao31-1, gao31-2])} [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
leader SyncGroup for group uklonvd826214 to coordinator 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483647 rack: 
null): (type=SyncGroupRequest, groupId=uklonvd826214, generationId=1113, 
memberId=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5, 
groupAssignment=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5) 
[kafka-consumer]
2017-11-10 11:30:05,824 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending SYNC_GROUP 
{group_id=uklonvd826214,generation_id=1113,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,group_assignment=[{member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_assignment=java.nio.HeapByteBuffer[pos=0
 lim=33 cap=33]}]} to node 2147483647. [kafka-consumer]
2017-11-10 11:30:06,051 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 2147483647, for key 14, received 
{throttle_time_ms=0,error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0
 lim=33 cap=33]} [kafka-consumer]
2017-11-10 11:30:06,051 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Successfully 
joined group uklonvd826214 with generation 1113 [kafka-consumer]
2017-11-10 11:30:06,051 TRACE 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Enabling 
heartbeat thread for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:06,051 INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Setting 
newly assigned partitions [gao31-1, gao31-2, gao31-0] for group uklonvd826214 
[kafka-consumer]
2017-11-10 11:30:06,052 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group 
uklonvd826214 fetching committed offsets for partitions: [gao31-1, gao31-2, 
gao31-0] [kafka-consumer]
2017-11-10 11:30:06,052 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending OFFSET_FETCH 
{group_id=uklonvd826214,topics=[{topic=gao31,partitions=[{partition=1},{partition=2},{partition=0}]}]}
 to node 2147483647. [kafka-consumer]
*{color:#d04437}2017-11-10 11:30:06,059 TRACE 
org.apache.kafka.clients.NetworkClient                        - Completed 
receive from node 2147483647, for key 9, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,offset=-1,metadata=,error_code=0},{partition=1,offset=-1,metadata=,error_code=0},{partition=2,offset=-1,metadata=,error_code=0}]}],error_code=0}
 [kafka-consumer]
2017-11-10 11:30:06,059 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group 
uklonvd826214 has no committed offset for partition gao31-1 [kafka-consumer]
2017-11-10 11:30:06,059 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group 
uklonvd826214 has no committed offset for partition gao31-2 [kafka-consumer]
2017-11-10 11:30:06,059 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group 
uklonvd826214 has no committed offset for partition gao31-0 
[kafka-consumer]{color}*
2017-11-10 11:30:06,059 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Sending 
ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, 
partitionTimestamps={gao31-1=-1, gao31-2=-1, gao31-0=-1}, minVersion=0) to 
broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:30:06,060 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending LIST_OFFSETS 
{replica_id=-1,isolation_level=0,topics=[{topic=gao31,partitions=[{partition=0,timestamp=-1},{partition=1,timestamp=-1},{partition=2,timestamp=-1}]}]}
 to node 0. [kafka-consumer]
2017-11-10 11:30:06,062 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 0, for key 2, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=0,timestamp=-1,offset=1288},{partition=1,error_code=0,timestamp=-1,offset=1290},{partition=2,error_code=0,timestamp=-1,offset=1718}]}]}
 [kafka-consumer]
2017-11-10 11:30:06,062 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Received 
ListOffsetResponse org.apache.kafka.common.requests.ListOffsetResponse@492e9c44 
from broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: 
null) [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Handling 
ListOffsetResponse response for gao31-1. Fetched offset 1290, timestamp -1 
[kafka-consumer]
2017-11-10 11:30:06,062 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Handling 
ListOffsetResponse response for gao31-2. Fetched offset 1718, timestamp -1 
[kafka-consumer]
2017-11-10 11:30:06,062 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Handling 
ListOffsetResponse response for gao31-0. Fetched offset 1288, timestamp -1 
[kafka-consumer]
2017-11-10 11:30:06,062 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting 
offset for partition gao31-1 to offset 1290. [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting 
offset for partition gao31-2 to offset 1718. [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting 
offset for partition gao31-0 to offset 1288. [kafka-consumer]
{code}



I am wondering why the new group coordinator is not able to know the committed 
offset for the consumer group ? Since the group ID was never being changed, 
just consumer leaves and re-joins, I am expecting that the new consumer is able 
to pick up the last committed offset and continue from there ? Otherwise we 
will lose messages on consumer side (the messages generated in the group 
coordinator downtime).

Any theories about this ?

Thanks







was (Author: gaoxinyang):
I had seen similar behaviors which also led to message loss, setup is as 
follows:

topic has 3 partitions and replication factor 3, named gao31
min.insync.replicas=2
consumer has default "auto.offset.reset=latest"
consumer manually commitSync offsets after handling messages with 
consumer.commitSync()
unclean leader election = false
kafka cluster has 3 brokers, kafka-foo-0, kafka-foo-1 and kafka-foo-2
consumer group ID is uklonvd826214
session.timeout.ms and max.poll.interval.ms are using default values

Initially kafka-foo-1 is selected as group coordinator

To reproduce: 
1. kill kafka-foo-1 
2. See following logs which implies that consumer disconnect to kafka-foo-1 and 
try to discover a new group coordinator 


{code:java}
2017-11-10 11:27:45,097 DEBUG org.apache.kafka.clients.NetworkClient            
            - Node 2147483646 disconnected. [kafka-consumer]
2017-11-10 11:27:45,097 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the 
coordinator kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 
2147483646 rack: null) dead for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient            
            - Removing node 
kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null) from 
least loaded node selection: is-blacked-out: true, in-flight-requests: 0 
[kafka-consumer]
2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient            
            - Removing node 
kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null) from 
least loaded node selection: is-blacked-out: false, in-flight-requests: 1 
[kafka-consumer]
2017-11-10 11:27:45,098 TRACE org.apache.kafka.clients.NetworkClient            
            - Found least loaded node 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:27:45,098 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
GroupCoordinator request for group uklonvd826214 to broker 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Removing node 
kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null) from 
least loaded node selection: is-blacked-out: true, in-flight-requests: 0 
[kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Removing node 
kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null) from 
least loaded node selection: is-blacked-out: false, in-flight-requests: 1 
[kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Found least loaded node 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:27:45,098 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
GroupCoordinator request for group uklonvd826214 to broker 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Removing node 
kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null) from 
least loaded node selection: is-blacked-out: true, in-flight-requests: 0 
[kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Removing node 
kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null) from 
least loaded node selection: is-blacked-out: false, in-flight-requests: 1 
[kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Found least loaded node 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:27:45,099 DEBUG org.apache.kafka.clients.NetworkClient            
            - Sending metadata request (type=MetadataRequest, topics=gao31) to 
node 0 [kafka-consumer]
2017-11-10 11:27:45,099 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending METADATA {topics=[gao31],allow_auto_topic_creation=true} 
to node 0. [kafka-consumer]
2017-11-10 11:27:45,100 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending FIND_COORDINATOR 
{coordinator_key=uklonvd826214,coordinator_type=0} to node 0. [kafka-consumer]
2017-11-10 11:27:45,276 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 2, for key 1, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=2,error_code=0,high_watermark=1631,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1630,
 timestamp=1510313264997, key=1 bytes, value=102 
bytes))]},{partition_header={partition=0,error_code=0,high_watermark=1223,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[]}]}]}
 [kafka-consumer]
2017-11-10 11:27:45,276 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch 
READ_UNCOMMITTED at offset 1630 for partition gao31-2 returned fetch data 
(error=NONE, highWaterMark=1631, lastStableOffset = -1, logStartOffset = 0, 
abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer]
2017-11-10 11:27:45,276 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch 
READ_UNCOMMITTED at offset 1223 for partition gao31-0 returned fetch data 
(error=NONE, highWaterMark=1223, lastStableOffset = -1, logStartOffset = 0, 
abortedTransactions = null, recordsSizeInBytes=0) [kafka-consumer]
2017-11-10 11:27:45,341 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 0, for key 1, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=1224,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1223,
 timestamp=1510313265277, key=1 bytes, value=102 bytes))]}]}]} [kafka-consumer]
2017-11-10 11:27:45,341 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch 
READ_UNCOMMITTED at offset 1223 for partition gao31-1 returned fetch data 
(error=NONE, highWaterMark=1224, lastStableOffset = -1, logStartOffset = 0, 
abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer]
2017-11-10 11:27:45,342 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 0, for key 3, received 
{throttle_time_ms=0,brokers=[{node_id=2,host=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com,port=9092,rack=null},{node_id=1,host=kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com,port=9092,rack=null},{node_id=0,host=kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com,port=9092,rack=null}],cluster_id=Y8zodxM7TNi-19dFcgErpw,controller_id=0,topic_metadata=[{topic_error_code=0,topic=gao31,is_internal=false,partition_metadata=[{partition_error_code=0,partition_id=2,leader=2,replicas=[1,2,0],isr=[0,2]},{partition_error_code=0,partition_id=1,leader=0,replicas=[0,1,2],isr=[0,2]},{partition_error_code=0,partition_id=0,leader=2,replicas=[2,0,1],isr=[2,0]}]}]}
 [kafka-consumer]
2017-11-10 11:27:45,342 DEBUG org.apache.kafka.clients.Metadata                 
            - Updated cluster metadata version 4 to Cluster(id = 
Y8zodxM7TNi-19dFcgErpw, nodes = 
[kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null), 
kafka-foo-1.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 1 rack: null), 
kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 rack: null)], 
partitions = [Partition(topic = gao31, partition = 0, leader = 2, replicas = 
[2,0,1], isr = [2,0]), Partition(topic = gao31, partition = 1, leader = 0, 
replicas = [0,1,2], isr = [0,2]), Partition(topic = gao31, partition = 2, 
leader = 2, replicas = [1,2,0], isr = [0,2])]) [kafka-consumer]
2017-11-10 11:27:45,343 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 0, for key 10, received 
{throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=2,host=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com,port=9092}}
 [kafka-consumer]
2017-11-10 11:27:45,343 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received 
GroupCoordinator response ClientResponse(receivedTimeMs=1510313265343, 
latencyMs=245, disconnected=false, 
requestHeader={api_key=10,api_version=1,correlation_id=7610,client_id=consumer-2},
 responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=NONE, node=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 
rack: null))) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,346 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 
2147483645 rack: null) for group uklonvd826214. [kafka-consumer]
2017-11-10 11:27:45,346 DEBUG org.apache.kafka.clients.NetworkClient            
            - Initiating connection to node 2147483645 at 
kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092. [kafka-consumer]
2017-11-10 11:27:45,350 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Preparing to 
read 188 bytes of data for partition gao31-2 with offset 1630 [kafka-consumer]
2017-11-10 11:27:45,350 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Updating high 
watermark for partition gao31-2 to 1631 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Returning 
fetched records at offset 1630 for assigned partition gao31-2 and update 
position to 1631 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Preparing to 
read 0 bytes of data for partition gao31-0 with offset 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Updating high 
watermark for partition gao31-0 to 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Returning 
fetched records at offset 1223 for assigned partition gao31-0 and update 
position to 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Preparing to 
read 188 bytes of data for partition gao31-1 with offset 1223 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Updating high 
watermark for partition gao31-1 to 1224 [kafka-consumer]
2017-11-10 11:27:45,351 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Returning 
fetched records at offset 1223 for assigned partition gao31-1 and update 
position to 1224 [kafka-consumer]
{code}



3. Then it tries to connect to kafka-foo-0 or kafka-foo-2 in a round-robin way:



{code:java}
2017-11-10 11:27:45,470 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 2147483645, for key 8, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=16},{partition=1,error_code=16},{partition=2,error_code=16}]}]}
 [kafka-consumer]
2017-11-10 11:27:45,470 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Offset 
commit for group uklonvd826214 failed: This is not the correct coordinator. 
[kafka-consumer]
2017-11-10 11:27:45,470 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking the 
coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 
2147483645 rack: null) dead for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,573 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 0, for key 1, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=1225,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null},record_set=[(record=DefaultRecord(offset=1224,
 timestamp=1510313265499, key=1 bytes, value=102 bytes))]}]}]} [kafka-consumer]
2017-11-10 11:27:45,573 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch 
READ_UNCOMMITTED at offset 1224 for partition gao31-1 returned fetch data 
(error=NONE, highWaterMark=1225, lastStableOffset = -1, logStartOffset = 0, 
abortedTransactions = null, recordsSizeInBytes=188) [kafka-consumer]
2017-11-10 11:27:45,575 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 2, for key 10, received 
{throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=2,host=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com,port=9092}}
 [kafka-consumer]
2017-11-10 11:27:45,575 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received 
GroupCoordinator response ClientResponse(receivedTimeMs=1510313265575, 
latencyMs=4, disconnected=false, 
requestHeader={api_key=10,api_version=1,correlation_id=7616,client_id=consumer-2},
 responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=NONE, node=kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2 
rack: null))) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,575 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
coordinator kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 
2147483645 rack: null) for group uklonvd826214. [kafka-consumer]
2017-11-10 11:27:45,576 TRACE 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Sending 
OffsetCommit request with {gao31-1=OffsetAndMetadata{offset=1224, metadata='no 
metadata'}, gao31-2=OffsetAndMetadata{offset=1631, metadata='no metadata'}, 
gao31-0=OffsetAndMetadata{offset=1223, metadata='no metadata'}} to coordinator 
kafka-foo-2.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483645 rack: 
null) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:27:45,576 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending OFFSET_COMMIT 
{group_id=uklonvd826214,group_generation_id=7,member_id=consumer-2-82264aac-077f-469a-9783-2d8949bd61c3,retention_time=-1,topics=[{topic=gao31,partitions=[{partition=0,offset=1223,metadata=no
 metadata},{partition=1,offset=1224,metadata=no 
metadata},{partition=2,offset=1631,metadata=no metadata}]}]} to node 
2147483645. [kafka-consumer]
2017-11-10 11:27:45,581 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 2147483645, for key 8, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=25},{partition=1,error_code=25},{partition=2,error_code=25}]}]}
 [kafka-consumer]
2017-11-10 11:27:45,581 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Offset 
commit for group uklonvd826214 failed: The coordinator is not aware of this 
member. [kafka-consumer]
2017-11-10 11:27:45,581 DEBUG com.db.gm.cto.core.tests.ContinuousConsumer       
            - unable to commit offsets [kafka-consumer]
2017-11-10 11:27:45,582 INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Revoking 
previously assigned partitions [gao31-1, gao31-2, gao31-0] for group 
uklonvd826214 [kafka-consumer]
......
{code}


this lasts for a about 3 minutes
4. Finally it discovered kafka-foo-0 and use it as group coordinator, however, 
the logs in red shows that the new group coordinator is not able to know what 
is the last committed offset for this consumer group, thus resetting the offset 
to "latest"




{code:java}
2017-11-10 11:30:05,817 TRACE org.apache.kafka.clients.NetworkClient            
            - Found least loaded node 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
connected with no in-flight requests [kafka-consumer]
2017-11-10 11:30:05,817 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
GroupCoordinator request for group uklonvd826214 to broker 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:30:05,817 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending FIND_COORDINATOR 
{coordinator_key=uklonvd826214,coordinator_type=0} to node 0. [kafka-consumer]
2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 0, for key 10, received 
{throttle_time_ms=0,error_code=0,error_message=null,coordinator={node_id=0,host=kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com,port=9092}}
 [kafka-consumer]
2017-11-10 11:30:05,820 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received 
GroupCoordinator response ClientResponse(receivedTimeMs=1510313405820, 
latencyMs=3, disconnected=false, 
requestHeader={api_key=10,api_version=1,correlation_id=10942,client_id=consumer-2},
 responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=NONE, node=kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 
rack: null))) for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:05,820 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
coordinator kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 
2147483647 rack: null) for group uklonvd826214. [kafka-consumer]
2017-11-10 11:30:05,820 TRACE 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Disabling 
heartbeat thread for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:05,820 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - (Re-)joining 
group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:05,820 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
JoinGroup ((type: JoinGroupRequest, groupId=uklonvd826214, 
sessionTimeout=10000, rebalanceTimeout=300000, 
memberId=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5, 
protocolType=consumer, 
groupProtocols=org.apache.kafka.common.requests.JoinGroupRequest$ProtocolMetadata@161cb9a1))
 to coordinator kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 
2147483647 rack: null) [kafka-consumer]
2017-11-10 11:30:05,820 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending JOIN_GROUP 
{group_id=uklonvd826214,session_timeout=10000,rebalance_timeout=300000,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0
 lim=17 cap=17]}]} to node 2147483647. [kafka-consumer]
2017-11-10 11:30:05,823 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 2147483647, for key 11, received 
{throttle_time_ms=0,error_code=0,generation_id=1113,group_protocol=range,leader_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,members=[{member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_metadata=java.nio.HeapByteBuffer[pos=0
 lim=17 cap=17]}]} [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Received 
successful JoinGroup response for group uklonvd826214: 
org.apache.kafka.common.requests.JoinGroupResponse@6f13c4b9 [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Performing 
assignment for group uklonvd826214 using strategy range with subscriptions 
{consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5=Subscription(topics=[gao31])} 
[kafka-consumer]
2017-11-10 11:30:05,823 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Finished 
assignment for group uklonvd826214: 
{consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5=Assignment(partitions=[gao31-0,
 gao31-1, gao31-2])} [kafka-consumer]
2017-11-10 11:30:05,823 DEBUG 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Sending 
leader SyncGroup for group uklonvd826214 to coordinator 
kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 2147483647 rack: 
null): (type=SyncGroupRequest, groupId=uklonvd826214, generationId=1113, 
memberId=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5, 
groupAssignment=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5) 
[kafka-consumer]
2017-11-10 11:30:05,824 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending SYNC_GROUP 
{group_id=uklonvd826214,generation_id=1113,member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,group_assignment=[{member_id=consumer-2-7669057b-2d14-46bf-a1e7-9cdad2615fd5,member_assignment=java.nio.HeapByteBuffer[pos=0
 lim=33 cap=33]}]} to node 2147483647. [kafka-consumer]
2017-11-10 11:30:06,051 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 2147483647, for key 14, received 
{throttle_time_ms=0,error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0
 lim=33 cap=33]} [kafka-consumer]
2017-11-10 11:30:06,051 INFO  
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Successfully 
joined group uklonvd826214 with generation 1113 [kafka-consumer]
2017-11-10 11:30:06,051 TRACE 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Enabling 
heartbeat thread for group uklonvd826214 [kafka-consumer]
2017-11-10 11:30:06,051 INFO  
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Setting 
newly assigned partitions [gao31-1, gao31-2, gao31-0] for group uklonvd826214 
[kafka-consumer]
2017-11-10 11:30:06,052 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group 
uklonvd826214 fetching committed offsets for partitions: [gao31-1, gao31-2, 
gao31-0] [kafka-consumer]
2017-11-10 11:30:06,052 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending OFFSET_FETCH 
{group_id=uklonvd826214,topics=[{topic=gao31,partitions=[{partition=1},{partition=2},{partition=0}]}]}
 to node 2147483647. [kafka-consumer]
{color:#d04437}2017-11-10 11:30:06,059 TRACE 
org.apache.kafka.clients.NetworkClient                        - Completed 
receive from node 2147483647, for key 9, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,offset=-1,metadata=,error_code=0},{partition=1,offset=-1,metadata=,error_code=0},{partition=2,offset=-1,metadata=,error_code=0}]}],error_code=0}
 [kafka-consumer]
2017-11-10 11:30:06,059 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group 
uklonvd826214 has no committed offset for partition gao31-1 [kafka-consumer]
2017-11-10 11:30:06,059 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group 
uklonvd826214 has no committed offset for partition gao31-2 [kafka-consumer]
2017-11-10 11:30:06,059 DEBUG 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - Group 
uklonvd826214 has no committed offset for partition gao31-0 
[kafka-consumer]{color}
2017-11-10 11:30:06,059 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Sending 
ListOffsetRequest (type=ListOffsetRequest, replicaId=-1, 
partitionTimestamps={gao31-1=-1, gao31-2=-1, gao31-0=-1}, minVersion=0) to 
broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: null) 
[kafka-consumer]
2017-11-10 11:30:06,060 TRACE org.apache.kafka.clients.NetworkClient            
            - Sending LIST_OFFSETS 
{replica_id=-1,isolation_level=0,topics=[{topic=gao31,partitions=[{partition=0,timestamp=-1},{partition=1,timestamp=-1},{partition=2,timestamp=-1}]}]}
 to node 0. [kafka-consumer]
2017-11-10 11:30:06,062 TRACE org.apache.kafka.clients.NetworkClient            
            - Completed receive from node 0, for key 2, received 
{throttle_time_ms=0,responses=[{topic=gao31,partition_responses=[{partition=0,error_code=0,timestamp=-1,offset=1288},{partition=1,error_code=0,timestamp=-1,offset=1290},{partition=2,error_code=0,timestamp=-1,offset=1718}]}]}
 [kafka-consumer]
2017-11-10 11:30:06,062 TRACE 
org.apache.kafka.clients.consumer.internals.Fetcher           - Received 
ListOffsetResponse org.apache.kafka.common.requests.ListOffsetResponse@492e9c44 
from broker kafka-foo-0.dk0095-a.int.uk.paas.intranet.db.com:9092 (id: 0 rack: 
null) [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Handling 
ListOffsetResponse response for gao31-1. Fetched offset 1290, timestamp -1 
[kafka-consumer]
2017-11-10 11:30:06,062 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Handling 
ListOffsetResponse response for gao31-2. Fetched offset 1718, timestamp -1 
[kafka-consumer]
2017-11-10 11:30:06,062 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Handling 
ListOffsetResponse response for gao31-0. Fetched offset 1288, timestamp -1 
[kafka-consumer]
2017-11-10 11:30:06,062 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting 
offset for partition gao31-1 to offset 1290. [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting 
offset for partition gao31-2 to offset 1718. [kafka-consumer]
2017-11-10 11:30:06,062 DEBUG 
org.apache.kafka.clients.consumer.internals.Fetcher           - Resetting 
offset for partition gao31-0 to offset 1288. [kafka-consumer]
{code}



I am wondering why the new group coordinator is not able to know the committed 
offset for the consumer group ? Since the group ID was never being changed, 
just consumer leaves and re-joins, I am expecting that the new consumer is able 
to pick up the last committed offset and continue from there ? Otherwise we 
will lose messages on consumer side (the messages generated in the group 
coordinator downtime).

Any theories about this ?

Thanks






> Loosing messages on OFFSET_OUT_OF_RANGE error in consumer
> ---------------------------------------------------------
>
>                 Key: KAFKA-6189
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6189
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.11.0.0
>            Reporter: Andrey
>         Attachments: kafkaLossingMessages.png
>
>
> Steps to reproduce:
> * Setup test:
> ** producer sends messages constantly. If cluster not available, then it will 
> retry
> ** consumer polling
> ** topic has 3 partitions and replication factor 3. 
> ** min.insync.replicas=2
> ** producer has "acks=all"
> ** consumer has default "auto.offset.reset=latest"
> ** consumer manually commitSync offsets after handling messages.
> **  unclean leader election = false
> ** kafka cluster has 3 brokers
> * Kill broker 0
> * In consumer's logs:
> {code}
> 2017-11-08 11:36:33,967 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch offset 
> 10706682 is out of range for partition mytopic-2, resetting offset 
> [kafka-consumer]
> 2017-11-08 11:36:33,968 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch offset 
> 8024431 is out of range for partition mytopic-1, resetting offset 
> [kafka-consumer]
> 2017-11-08 11:36:34,045 INFO  
> org.apache.kafka.clients.consumer.internals.Fetcher           - Fetch offset 
> 8029505 is out of range for partition mytopic-0, resetting offset 
> [kafka-consumer]
> {code}
> After that, consumer lost several messages on each partition.
> Expected:
> * return upper bound of range
> * consumer should resume from that offset instead of "auto.offset.reset".
> Workaround:
> * put "auto.offset.reset=earliest"
> * get a lot of duplicate messages, instead of lost
> Looks like this is what happening during the recovery from broker failure 
> (see attachment)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to