[ https://issues.apache.org/jira/browse/KAFKA-5880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16164186#comment-16164186 ]
Lae commented on KAFKA-5880: ---------------------------- No problem I'll post the trace logging here for the consumer once I'm back at the work computer tomorrow. In the mean while this is the trace logging from the server for one occurrence: {noformat} [2017-09-13 05:00:00,126] TRACE [KafkaApi-0] Handling request:{api_key=1,api_version=5,correlation_id=242419,client_id=consumer-1} -- {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=1,topics=[{topic=index-updates,partitions=[{partition=0,fetch_offset=6,log_start_offset=-1,max_bytes=1048576},{partition=9,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=5,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=6,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=0,log_start_offset=-1,max_bytes=1048576},{partition=7,fetch_offset=2,log_start_offset=-1,max_bytes=1048576},{partition=3,fetch_offset=46644,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=1,log_start_offset=-1,max_bytes=1048576},{partition=1,fetch_offset=255,log_start_offset=-1,max_bytes=1048576}]}]} from connection 172.17.0.4:9092-172.17.0.1:35521;securityProtocol:PLAINTEXT,principal:User:ANONYMOUS (kafka.server.KafkaApis) [2017-09-13 05:00:00,126] TRACE [Replica Manager on Broker 0]: Fetching log segment for partition index-updates-0, offset 6, partition fetch size 1048576, remaining response limit 52428800, ignoring response/partition size limits (kafka.server.ReplicaManager) [2017-09-13 05:00:00,126] TRACE Reading 1048576 bytes from offset 6 in log index-updates-0 of length 0 bytes (kafka.log.Log) [2017-09-13 05:00:00,126] TRACE [Replica Manager on Broker 0]: Fetching log segment for partition index-updates-9, offset 0, partition fetch size 1048576, remaining response limit 52428800, ignoring response/partition size limits (kafka.server.ReplicaManager) [2017-09-13 05:00:00,126] TRACE Reading 1048576 bytes from offset 0 in log index-updates-9 of length 0 bytes (kafka.log.Log) [2017-09-13 05:00:00,126] TRACE [Replica Manager on Broker 0]: Fetching log segment for partition index-updates-8, offset 0, partition fetch size 1048576, remaining response limit 52428800, ignoring response/partition size limits (kafka.server.ReplicaManager) [2017-09-13 05:00:00,126] TRACE Reading 1048576 bytes from offset 0 in log index-updates-8 of length 0 bytes (kafka.log.Log) [2017-09-13 05:00:00,126] TRACE [Replica Manager on Broker 0]: Fetching log segment for partition index-updates-5, offset 0, partition fetch size 1048576, remaining response limit 52428800, ignoring response/partition size limits (kafka.server.ReplicaManager) [2017-09-13 05:00:00,126] TRACE Reading 1048576 bytes from offset 0 in log index-updates-5 of length 0 bytes (kafka.log.Log) [2017-09-13 05:00:00,126] TRACE [Replica Manager on Broker 0]: Fetching log segment for partition index-updates-6, offset 0, partition fetch size 1048576, remaining response limit 52428800, ignoring response/partition size limits (kafka.server.ReplicaManager) [2017-09-13 05:00:00,126] TRACE Reading 1048576 bytes from offset 0 in log index-updates-6 of length 0 bytes (kafka.log.Log) [2017-09-13 05:00:00,126] TRACE [Replica Manager on Broker 0]: Fetching log segment for partition index-updates-2, offset 0, partition fetch size 1048576, remaining response limit 52428800, ignoring response/partition size limits (kafka.server.ReplicaManager) [2017-09-13 05:00:00,126] TRACE Reading 1048576 bytes from offset 0 in log index-updates-2 of length 0 bytes (kafka.log.Log) [2017-09-13 05:00:00,126] TRACE [Replica Manager on Broker 0]: Fetching log segment for partition index-updates-7, offset 2, partition fetch size 1048576, remaining response limit 52428800, ignoring response/partition size limits (kafka.server.ReplicaManager) [2017-09-13 05:00:00,126] TRACE Reading 1048576 bytes from offset 2 in log index-updates-7 of length 145 bytes (kafka.log.Log) [2017-09-13 05:00:00,126] TRACE [Replica Manager on Broker 0]: Fetching log segment for partition index-updates-3, offset 46644, partition fetch size 1048576, remaining response limit 52428800, ignoring response/partition size limits (kafka.server.ReplicaManager) [2017-09-13 05:00:00,126] TRACE Reading 1048576 bytes from offset 46644 in log index-updates-3 of length 7071465 bytes (kafka.log.Log) [2017-09-13 05:00:00,127] TRACE [Replica Manager on Broker 0]: Fetching log segment for partition index-updates-4, offset 1, partition fetch size 1048576, remaining response limit 52428800, ignoring response/partition size limits (kafka.server.ReplicaManager) [2017-09-13 05:00:00,127] TRACE Reading 1048576 bytes from offset 1 in log index-updates-4 of length 72 bytes (kafka.log.Log) [2017-09-13 05:00:00,127] TRACE [Replica Manager on Broker 0]: Fetching log segment for partition index-updates-1, offset 255, partition fetch size 1048576, remaining response limit 52428800, ignoring response/partition size limits (kafka.server.ReplicaManager) [2017-09-13 05:00:00,127] TRACE Reading 1048576 bytes from offset 255 in log index-updates-1 of length 72 bytes (kafka.log.Log) {noformat} > Transactional producer and read committed consumer causes consumer to stuck > --------------------------------------------------------------------------- > > Key: KAFKA-5880 > URL: https://issues.apache.org/jira/browse/KAFKA-5880 > Project: Kafka > Issue Type: Bug > Reporter: Lae > Attachments: index-updates-3.zip > > > We use transactional producers, and have configured isolation level on the > consumer to only read committed data. The consumer has somehow got into a > stuck state where it can no longer move forward because the Kafka server > always return empty list of records despite there are thousands more > successful transactions after the offset. > This is an example producer code: > {code:java} > Properties config = new Properties(); > config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, > UUID.randomUUID().toString()); > config.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); > config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class.getName()); > try (Producer<String, String> producer = new KafkaProducer<>(config)) { > producer.initTransactions(); > try { > producer.beginTransaction(); > // Multiple producer.send(...) here > producer.commitTransaction(); > } catch (Throwable e) { > producer.abortTransaction(); > } > } > {code} > This is the test consumer code: > {code:java} > Properties config = new Properties(); > config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); > config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); > config.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, > IsolationLevel.READ_COMMITTED.toString().toLowerCase(ENGLISH)); > config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > StringDeserializer.class.getName()); > try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config)) { > consumer.subscribe(Collections.singleton("index-updates")); > while (true) { > ConsumerRecords<String, String> records = consumer.poll(5000); > for (ConsumerRecord<String, String> record : records) { > System.err.println(record.value()); > } > consumer.commitSync(); > } > } > {code} > I have also attached the problematic partition data index-updates-3.zip, to > reproduce the issue using the data, you can run a local Kafka instance, then > create a topic called "index-updates" with 10 partitions, and replace the > content of the index-updates-3 log directory with the attached content, then > running the above consumer code. > Then the consumer will be stuck at some point (not always at the same offset) > not making anymore progress even if you send new data into the partition > (other partitions seem fine). The following example is when the consumer was > stuck at offset 46644, and the Kafka server always return empty list of > records when the consumer fetches from 46644: > {noformat} > root@0b1e67f0c34b:/# /opt/kafka/bin/kafka-consumer-groups.sh --describe > --group my-group --bootstrap-server localhost:9092 > Note: This will only show information about consumers that use the Java > consumer API (non-ZooKeeper-based consumers). > TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG > CONSUMER-ID HOST > CLIENT-ID > index-updates 0 15281 15281 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 1 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 2 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 3 46644 65735 > 19091 consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 4 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 5 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 6 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 7 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 8 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > index-updates 9 0 0 0 > consumer-1-c8f3fede-ef6f-4d12-a426-44aa9c47e71f /10.100.1.97 > consumer-1 > root@0b1e67f0c34b:/# > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)