kirktrue commented on code in PR #13607: URL: https://github.com/apache/kafka/pull/13607#discussion_r1180838837
########## clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java: ########## @@ -238,6 +238,37 @@ public void testV6AndBelowCannotUseZStdCompression() { // Works fine with current version (>= 7) ProduceRequest.forCurrentMagic(produceData); } + + @Test + public void testNoMixedProducerIds() { + final long producerId1 = 15L; + final long producerId2 = 16L; + final short producerEpoch = 5; + final int sequence = 10; + + final MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, + new SimpleRecord("foo".getBytes())); + final MemoryRecords txnRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId1, + producerEpoch, sequence, new SimpleRecord("bar".getBytes())); + final MemoryRecords idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId2, + producerEpoch, sequence, new SimpleRecord("bee".getBytes())); + + + ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic( + new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList( + new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(records))), + new ProduceRequestData.TopicProduceData().setName("bar").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(txnRecords))), + new ProduceRequestData.TopicProduceData().setName("bee").setPartitionData(Collections.singletonList( Review Comment: Please change `bee` to `baz` 😛 ########## clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java: ########## @@ -222,6 +223,27 @@ public void clearPartitionRecords() { partitionSizes(); data = null; } + + public static void validateProducerIds(short version, ProduceRequestData data) { + if (version >= 3) { + long producerId = -1; + for (ProduceRequestData.TopicProduceData topicData : data.topicData()) { + for (ProduceRequestData.PartitionProduceData partitionData : topicData.partitionData()) { + BaseRecords baseRecords = partitionData.records(); + if (baseRecords instanceof Records) { + Records records = (Records) baseRecords; + for (RecordBatch batch : records.batches()) { + if (producerId == -1 && batch.hasProducerId()) Review Comment: Is it an error if there's one batch with producer ID and another without one? ########## core/src/main/scala/kafka/server/ReplicaManager.scala: ########## @@ -642,7 +642,14 @@ class ReplicaManager(val config: KafkaConfig, (entriesPerPartition, Map.empty) else entriesPerPartition.partition { case (topicPartition, records) => - getPartitionOrException(topicPartition).hasOngoingTransaction(records.firstBatch().producerId()) + // Produce requests (only requests that require verification) should only have one batch in "batches" but check all just to be safe. + val transactionalBatches = records.batches.asScala.filter(batch => batch.hasProducerId && batch.isTransactional) Review Comment: Is it valid for `isTransactional` to be `true` but `hasProducerId` to be `false`? ########## clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java: ########## @@ -222,6 +223,27 @@ public void clearPartitionRecords() { partitionSizes(); data = null; } + + public static void validateProducerIds(short version, ProduceRequestData data) { + if (version >= 3) { Review Comment: For the uninitiated, can we move and/or comment why version `3` is special? ########## clients/src/test/java/org/apache/kafka/common/requests/ProduceRequestTest.java: ########## @@ -238,6 +238,37 @@ public void testV6AndBelowCannotUseZStdCompression() { // Works fine with current version (>= 7) ProduceRequest.forCurrentMagic(produceData); } + + @Test + public void testNoMixedProducerIds() { + final long producerId1 = 15L; + final long producerId2 = 16L; + final short producerEpoch = 5; + final int sequence = 10; + + final MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, + new SimpleRecord("foo".getBytes())); + final MemoryRecords txnRecords = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId1, + producerEpoch, sequence, new SimpleRecord("bar".getBytes())); + final MemoryRecords idempotentRecords = MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId2, + producerEpoch, sequence, new SimpleRecord("bee".getBytes())); + + + ProduceRequest.Builder requestBuilder = ProduceRequest.forCurrentMagic( + new ProduceRequestData() + .setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList( + new ProduceRequestData.TopicProduceData().setName("foo").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(records))), + new ProduceRequestData.TopicProduceData().setName("bar").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(1).setRecords(txnRecords))), + new ProduceRequestData.TopicProduceData().setName("bee").setPartitionData(Collections.singletonList( + new ProduceRequestData.PartitionProduceData().setIndex(0).setRecords(idempotentRecords)))) + .iterator())) + .setAcks((short) 1) + .setTimeoutMs(5000)); + IntStream.range(3, ApiKeys.PRODUCE.latestVersion()) Review Comment: Another place where knowing why version three is special would be helpful (for me). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org