[ https://issues.apache.org/jira/browse/KAFKA-6119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16219689#comment-16219689 ]
Apurva Mehta commented on KAFKA-6119: ------------------------------------- Thanks Gary. This indeed looks like a bug. I downloaded your program and ran it. Here are the kafka data logs for the two partitions: {noformat} amehta-macbook-pro:kafka-logs apurva$ ~/workspace/confluent/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files producer-test-1/00000000000000000000.log --deep-iteration SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/core/build/dependant-libs-2.11.11/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/tools/build/dependant-libs-2.11.11/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Dumping producer-test-1/00000000000000000000.log Starting offset: 0 offset: 0 position: 0 CreateTime: 1508970707070 isvalid: true keysize: -1 valuesize: 23 magic: 2 compresscodec: NONE producerId: 0 sequence: 0 isTransactional: true headerKeys: [] offset: 1 position: 91 CreateTime: 1508970707085 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 0 amehta-macbook-pro:kafka-logs apurva$ ~/workspace/confluent/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files producer-test-0/00000000000000000000.log --deep-iterati on SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/core/build/dependant-libs-2.11.11/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/tools/build/dependant-libs-2.11.11/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Dumping producer-test-0/00000000000000000000.log Starting offset: 0 offset: 0 position: 0 CreateTime: 1508970637066 isvalid: true keysize: -1 valuesize: 23 magic: 2 compresscodec: NONE producerId: 0 sequence: 0 isTransactional: true headerKeys: [] offset: 1 position: 91 CreateTime: 1508970666872 isvalid: true keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 0 sequence: -1 isTransactional: true headerKeys: [] endTxnMarker: ABORT coordinatorEpoch: 0 amehta-macbook-pro:kafka-logs apurva$ {noformat} And here is the trace level output from the kafka producer: {noformat} 5:30:36.041 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.11.0.1 15:30:36.041 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : c2a0d5f9b1f45bf5 15:30:36.041 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - Kafka producer with client id producer-1 created 15:30:36.042 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Transition from state UNINITIALIZED to INITIALIZING 15:30:36.042 [main] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] ProducerId set to -1 with epoch -1 15:30:36.048 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=test-producer-1508970635810, transactionTimeoutMs=100) 15:30:36.051 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, coordinatorType=TRANSACTION) 15:30:36.051 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Enqueuing transactional request (type=InitProducerIdRequest, transactionalId=test-producer-1508970635810, transactionTimeoutMs=100) 15:30:36.154 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node localhost:9092 (id: -1 rack: null) 15:30:36.211 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent 15:30:36.211 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received 15:30:36.212 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency 15:30:36.214 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - Created socket with SO_RCVBUF = 326640, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node -1 15:30:36.215 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1. Fetching API versions. 15:30:36.215 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating API versions fetch from node -1. 15:30:36.286 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Recorded API versions for node -1: (Produce(0): 0 to 5 [usable: 3], Fetch(1): 0 to 6 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 4], LeaderAndIsr(4): 0 to 1 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 3], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], UNKNOWN(34): 0, UNKNOWN(35): 0, UNKNOWN(36): 0, UNKNOWN(37): 0) 15:30:36.287 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId test-producer-1508970635810] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null) 15:30:36.323 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, coordinatorType=TRANSACTION) 15:30:36.426 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId test-producer-1508970635810] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null) 15:30:36.430 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, coordinatorType=TRANSACTION) 15:30:36.535 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId test-producer-1508970635810] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null) 15:30:36.538 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, coordinatorType=TRANSACTION) 15:30:36.642 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId test-producer-1508970635810] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null) 15:30:36.648 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, coordinatorType=TRANSACTION) 15:30:36.751 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId test-producer-1508970635810] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null) 15:30:36.755 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Enqueuing transactional request (type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, coordinatorType=TRANSACTION) 15:30:36.859 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId test-producer-1508970635810] Sending transactional request (type=FindCoordinatorRequest, coordinatorKey=test-producer-1508970635810, coordinatorType=TRANSACTION) to node localhost:9092 (id: -1 rack: null) 15:30:36.871 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 10.200.6.85:9092 (id: 0 rack: null) 15:30:36.871 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent 15:30:36.872 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-received 15:30:36.872 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.latency 15:30:36.872 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - Created socket with SO_RCVBUF = 310308, SO_SNDBUF = 146988, SO_TIMEOUT = 0 to node 0 15:30:36.872 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 0. Fetching API versions. 15:30:36.872 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating API versions fetch from node 0. 15:30:36.873 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Recorded API versions for node 0: (Produce(0): 0 to 5 [usable: 3], Fetch(1): 0 to 6 [usable: 5], Offsets(2): 0 to 2 [usable: 2], Metadata(3): 0 to 5 [usable: 4], LeaderAndIsr(4): 0 to 1 [usable: 0], StopReplica(5): 0 [usable: 0], UpdateMetadata(6): 0 to 4 [usable: 3], ControlledShutdown(7): 0 to 1 [usable: 1], OffsetCommit(8): 0 to 3 [usable: 3], OffsetFetch(9): 0 to 3 [usable: 3], FindCoordinator(10): 0 to 1 [usable: 1], JoinGroup(11): 0 to 2 [usable: 2], Heartbeat(12): 0 to 1 [usable: 1], LeaveGroup(13): 0 to 1 [usable: 1], SyncGroup(14): 0 to 1 [usable: 1], DescribeGroups(15): 0 to 1 [usable: 1], ListGroups(16): 0 to 1 [usable: 1], SaslHandshake(17): 0 to 1 [usable: 0], ApiVersions(18): 0 to 1 [usable: 1], CreateTopics(19): 0 to 2 [usable: 2], DeleteTopics(20): 0 to 1 [usable: 1], DeleteRecords(21): 0 [usable: 0], InitProducerId(22): 0 [usable: 0], OffsetForLeaderEpoch(23): 0 [usable: 0], AddPartitionsToTxn(24): 0 [usable: 0], AddOffsetsToTxn(25): 0 [usable: 0], EndTxn(26): 0 [usable: 0], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 [usable: 0], DescribeAcls(29): 0 [usable: 0], CreateAcls(30): 0 [usable: 0], DeleteAcls(31): 0 [usable: 0], DescribeConfigs(32): 0 [usable: 0], AlterConfigs(33): 0 [usable: 0], UNKNOWN(34): 0, UNKNOWN(35): 0, UNKNOWN(36): 0, UNKNOWN(37): 0) 15:30:36.978 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId test-producer-1508970635810] Sending transactional request (type=InitProducerIdRequest, transactionalId=test-producer-1508970635810, transactionTimeoutMs=100) to node 10.200.6.85:9092 (id: 0 rack: null) 15:30:37.052 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] ProducerId set to 0 with epoch 0 15:30:37.052 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Transition from state INITIALIZING to READY 15:30:37.052 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Transition from state READY to IN_TRANSACTION 15:30:37.053 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request (type=MetadataRequest, topics=producer-test) to node localhost:9092 (id: -1 rack: null) 15:30:37.061 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(id = MWSsYEsXSIKPeqPAE1C9rg, nodes = [10.200.6.85:9092 (id: 0 rack: null)], partitions = [Partition(topic = producer-test, partition = 1, leader = 0, replicas = [0], isr = [0]), Partition(topic = producer-test, partition = 0, leader = 0, replicas = [0], isr = [0])]) 15:30:37.066 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Begin adding new partition producer-test-0 to transaction 15:30:37.071 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=test-producer-1508970635810, producerId=0, producerEpoch=0, partitions=[producer-test-0]) 15:30:37.072 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId test-producer-1508970635810] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=test-producer-1508970635810, producerId=0, producerEpoch=0, partitions=[producer-test-0]) to node 10.200.6.85:9092 (id: 0 rack: null) 15:30:37.081 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Successfully added partitions [producer-test-0] to transaction 15:30:37.081 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator - Assigning sequence number 0 from producer (producerId=0, epoch=0) to dequeued batch from partition producer-test-0 bound for 10.200.6.85:9092 (id: 0 rack: null). 15:30:37.083 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.producer-test.records-per-batch 15:30:37.084 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.producer-test.bytes 15:30:37.084 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.producer-test.compression-rate 15:30:37.084 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.producer-test.record-retries 15:30:37.084 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.producer-test.record-errors 15:30:37.110 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Incremented sequence number for topic-partition producer-test-0 to 1 15:31:47.070 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Begin adding new partition producer-test-1 to transaction 15:31:47.071 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Transition from state IN_TRANSACTION to COMMITTING_TRANSACTION 15:31:47.072 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Enqueuing transactional request (type=AddPartitionsToTxnRequest, transactionalId=test-producer-1508970635810, producerId=0, producerEpoch=0, partitions=[producer-test-1]) 15:31:47.073 [main] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Enqueuing transactional request (type=EndTxnRequest, transactionalId=test-producer-1508970635810, producerId=0, producerEpoch=0, result=COMMIT) 15:31:47.073 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId test-producer-1508970635810] Sending transactional request (type=AddPartitionsToTxnRequest, transactionalId=test-producer-1508970635810, producerId=0, producerEpoch=0, partitions=[producer-test-1]) to node 10.200.6.85:9092 (id: 0 rack: null) 15:31:47.078 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Successfully added partitions [producer-test-1] to transaction 15:31:47.078 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.RecordAccumulator - Assigning sequence number 0 from producer (producerId=0, epoch=0) to dequeued batch from partition producer-test-1 bound for 10.200.6.85:9092 (id: 0 rack: null). 15:31:47.081 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Incremented sequence number for topic-partition producer-test-1 to 1 15:31:47.082 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - [TransactionalId test-producer-1508970635810] Sending transactional request (type=EndTxnRequest, transactionalId=test-producer-1508970635810, producerId=0, producerEpoch=0, result=COMMIT) to node 10.200.6.85:9092 (id: 0 rack: null) 15:31:47.085 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.TransactionManager - [TransactionalId test-producer-1508970635810] Transition from state COMMITTING_TRANSACTION to READY 15:31:47.085 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 15:31:47.085 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records. 15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed: 15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created: 15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received: 15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent: 15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received: 15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time: 15:31:47.088 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time: 15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent 15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received 15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency 15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-sent 15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-received 15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.latency 15:31:47.089 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Shutdown of Kafka producer I/O thread has completed. 15:31:47.089 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - Kafka producer with client id producer-1 has been closed {noformat} As we can see, the message to partition 0 is aborted, but the second one is erroneously committed. Fencing isn't working as it should here. This can be seen from the dump of the transaction log: {noformat} amehta-macbook-pro:__transaction_state-35 apurva$ ~/workspace/confluent/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --transaction-log-decoder --files 00000000000000000000.log --deep-iteration Dumping 00000000000000000000.log Starting offset: 0 SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/core/build/dependant-libs-2.11.11/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/tools/build/dependant-libs-2.11.11/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/Users/apurva/workspace/confluent/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] offset: 0 position: 0 CreateTime: 1508970637004 isvalid: true keysize: 31 valuesize: 37 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 isTransactional: false headerKeys: [] key: transactionalId=test-producer-1508970635810 payload: producerId:0,producerEpoch:0,state=Empty,partitions=Set(),txnLastUpdateTimestamp=1508970636999,txnTimeoutMs=100 offset: 1 position: 137 CreateTime: 1508970637077 isvalid: true keysize: 31 valuesize: 60 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 isTransactional: false headerKeys: [] key: transactionalId=test-producer-1508970635810 payload: producerId:0,producerEpoch:0,state=Ongoing,partitions=Set(producer-test-0),txnLastUpdateTimestamp=1508970637076,txnTimeoutMs=100 offset: 2 position: 297 CreateTime: 1508970666849 isvalid: true keysize: 31 valuesize: 60 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 isTransactional: false headerKeys: [] key: transactionalId=test-producer-1508970635810 payload: producerId:0,producerEpoch:0,state=PrepareAbort,partitions=Set(producer-test-0),txnLastUpdateTimestamp=1508970666848,txnTimeoutMs=100 offset: 3 position: 457 CreateTime: 1508970666884 isvalid: true keysize: 31 valuesize: 37 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 isTransactional: false headerKeys: [] key: transactionalId=test-producer-1508970635810 payload: producerId:0,producerEpoch:0,state=CompleteAbort,partitions=Set(),txnLastUpdateTimestamp=1508970666852,txnTimeoutMs=100 offset: 4 position: 594 CreateTime: 1508970707075 isvalid: true keysize: 31 valuesize: 60 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 isTransactional: false headerKeys: [] key: transactionalId=test-producer-1508970635810 payload: producerId:0,producerEpoch:0,state=Ongoing,partitions=Set(producer-test-1),txnLastUpdateTimestamp=1508970707074,txnTimeoutMs=100 offset: 5 position: 754 CreateTime: 1508970707083 isvalid: true keysize: 31 valuesize: 60 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 isTransactional: false headerKeys: [] key: transactionalId=test-producer-1508970635810 payload: producerId:0,producerEpoch:0,state=PrepareCommit,partitions=Set(producer-test-1),txnLastUpdateTimestamp=1508970707083,txnTimeoutMs=100 offset: 6 position: 914 CreateTime: 1508970707086 isvalid: true keysize: 31 valuesize: 37 magic: 2 compresscodec: NONE producerId: -1 sequence: -1 isTransactional: false headerKeys: [] key: transactionalId=test-producer-1508970635810 payload: producerId:0,producerEpoch:0,state=CompleteCommit,partitions=Set(),txnLastUpdateTimestamp=1508970707084,txnTimeoutMs=100 amehta-macbook-pro:__transaction_state-35 apurva$ {noformat} The transaction state doesn't have an epoch bump. Hence the second send succeeded. > Silent Data Loss in Kafka011 Transactional Producer > --------------------------------------------------- > > Key: KAFKA-6119 > URL: https://issues.apache.org/jira/browse/KAFKA-6119 > Project: Kafka > Issue Type: Bug > Components: core, producer > Affects Versions: 0.11.0.0, 0.11.0.1 > Environment: openjdk version "1.8.0_144" > OpenJDK Runtime Environment (Zulu 8.23.0.3-macosx) (build 1.8.0_144-b01) > OpenJDK 64-Bit Server VM (Zulu 8.23.0.3-macosx) (build 25.144-b01, mixed mode) > Reporter: Gary Y. > Priority: Blocker > Labels: reliability > > Kafka can lose data published by a transactional {{KafkaProducer}} under some > circumstances, i.e., data that should be committed atomically may not be > fully visible from a consumer with {{read_committed}} isolation level. > > *Steps to reproduce:* > # Set {{transaction.timeout.ms}} to a low value such as {{100}} > # Publish two messages in one transaction to different partitions of a topic > with a sufficiently long time in-between the messages (e.g., 70 s). > # Only the second message is visible with {{read_committed}} isolation level. > See > https://github.com/GJL/kafka011-transactional-producer-bug-demo/blob/master/src/main/java/com/garyyao/App.java > for a full example. Detailed instructions can be found in the {{README.md}}: > https://github.com/GJL/kafka011-transactional-producer-bug-demo > *Why is this possible?* > Because the transaction timeout is set to a low value, the transaction will > be rolled back quickly after the first message is sent. Indeed, in the broker > the following logs could be found: > {code} > [2017-10-25 22:54:58,224] INFO [Transaction Coordinator 0]: Initialized > transactionalId test-producer-1508964897483 with producerId 5 and producer > epoch 0 on partition __transaction_state-10 > (kafka.coordinator.transaction.TransactionCoordinator) > [2017-10-25 22:55:24,260] INFO [Transaction Coordinator 0]: Completed > rollback ongoing transaction of transactionalId: test-producer-1508964897483 > due to timeout (kafka.coordinator.transaction.TransactionCoordinator) > {code} > After rollback, the second message is sent to a different partition than the > first message. > Upon, transaction commit, > {{org.apache.kafka.clients.producer.internals.TransactionManager}} may > enqueue the request {{addPartitionsToTransactionHandler}}: > {code} > private TransactionalRequestResult > beginCompletingTransaction(TransactionResult transactionResult) { > if (!newPartitionsInTransaction.isEmpty()) > enqueueRequest(addPartitionsToTransactionHandler()); > EndTxnRequest.Builder builder = new > EndTxnRequest.Builder(transactionalId, producerIdAndEpoch.producerId, > producerIdAndEpoch.epoch, transactionResult); > EndTxnHandler handler = new EndTxnHandler(builder); > enqueueRequest(handler); > return handler.result; > } > {code} > As can be seen, the condition is fulfilled if {{newPartitionsInTransaction}} > is non-empty. I suspect because the second message goes to a different > partition, this condition is satisfied. > In {{KafkaApis.scala}}, I can see that {{handleAddPartitionToTxnRequest}} may > eventually call {{TransactionMetadata#prepareAddPartitions}}: > {code} > def prepareAddPartitions(addedTopicPartitions: > immutable.Set[TopicPartition], updateTimestamp: Long): TxnTransitMetadata = { > val newTxnStartTimestamp = state match { > case Empty | CompleteAbort | CompleteCommit => updateTimestamp > case _ => txnStartTimestamp > } > prepareTransitionTo(Ongoing, producerId, producerEpoch, txnTimeoutMs, > (topicPartitions ++ addedTopicPartitions).toSet, > newTxnStartTimestamp, updateTimestamp) > } > {code} > Note that the method's first argument {{newState}} of is always *Ongoing* > here. I suspect that this puts the transaction, which should be aborted, to > _Ongoing_ again. -- This message was sent by Atlassian JIRA (v6.4.14#64029)