Repository: kafka Updated Branches: refs/heads/0.11.0 458a9a4c5 -> 3c3edc9db
KAFKA-5340; Batch splitting should preserve magic and transactional flag Author: Jason Gustafson <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Jiangjie Qin <[email protected]>, Guozhang Wang <[email protected]> Closes #3162 from hachikuji/KAFKA-5340 (cherry picked from commit e4a6b50deca8fabc9880c6764334bfaa830a6d5e) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3c3edc9d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3c3edc9d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3c3edc9d Branch: refs/heads/0.11.0 Commit: 3c3edc9dbaa5d22e0d5c466eb6015d7dea1e0c01 Parents: 458a9a4 Author: Jason Gustafson <[email protected]> Authored: Wed May 31 21:31:52 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Wed May 31 21:34:11 2017 -0700 ---------------------------------------------------------------------- .../producer/internals/ProducerBatch.java | 30 +++++-- .../producer/internals/RecordAccumulator.java | 13 +-- .../clients/producer/internals/Sender.java | 4 +- .../producer/internals/TransactionManager.java | 9 +++ .../kafka/common/record/MemoryRecords.java | 13 +-- .../common/record/MemoryRecordsBuilder.java | 7 +- .../producer/internals/ProducerBatchTest.java | 43 +++++++++- .../internals/RecordAccumulatorTest.java | 2 +- .../clients/producer/internals/SenderTest.java | 85 +++++++++++++++----- .../common/record/MemoryRecordsBuilderTest.java | 9 +-- 10 files changed, 162 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3c3edc9d/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 974e230..c7253a9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.AbstractRecords; import org.apache.kafka.common.record.CompressionRatioEstimator; +import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.MutableRecordBatch; @@ -43,6 +44,7 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2; import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP; @@ -179,23 +181,29 @@ public final class ProducerBatch { public Deque<ProducerBatch> split(int splitBatchSize) { Deque<ProducerBatch> batches = new ArrayDeque<>(); MemoryRecords memoryRecords = recordsBuilder.build(); + Iterator<MutableRecordBatch> recordBatchIter = memoryRecords.batches().iterator(); if (!recordBatchIter.hasNext()) throw new IllegalStateException("Cannot split an empty producer batch."); + RecordBatch recordBatch = recordBatchIter.next(); + if (recordBatch.magic() < MAGIC_VALUE_V2 && !recordBatch.isCompressed()) + throw new IllegalArgumentException("Batch splitting cannot be used with non-compressed messages " + + "with version v0 and v1"); + if (recordBatchIter.hasNext()) - throw new IllegalStateException("A producer batch should only have one record batch."); + throw new IllegalArgumentException("A producer batch should only have one record batch."); Iterator<Thunk> thunkIter = thunks.iterator(); // We always allocate batch size because we are already splitting a big batch. // And we also Retain the create time of the original batch. ProducerBatch batch = null; + for (Record record : recordBatch) { assert thunkIter.hasNext(); Thunk thunk = thunkIter.next(); - if (batch == null) { + if (batch == null) batch = createBatchOffAccumulatorForRecord(record, splitBatchSize); - } // A newly created batch can always host the first message. if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) { @@ -204,6 +212,7 @@ public final class ProducerBatch { batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk); } } + // Close the last batch and add it to the batch list after split. if (batch != null) batches.add(batch); @@ -217,11 +226,19 @@ public final class ProducerBatch { int initialSize = Math.max(AbstractRecords.sizeInBytesUpperBound(magic(), record.key(), record.value(), record.headers()), batchSize); ByteBuffer buffer = ByteBuffer.allocate(initialSize); + + // Note that we intentionally do not set producer state (producerId, epoch, sequence, and isTransactional) + // for the newly created batch. This will be set when the batch is dequeued for sending (which is consistent + // with how normal batches are handled). MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic(), recordsBuilder.compressionType(), - TimestampType.CREATE_TIME, 0L, recordsBuilder.isTransactional()); + TimestampType.CREATE_TIME, 0L); return new ProducerBatch(topicPartition, builder, this.createdMs, true); } + public boolean isCompressed() { + return recordsBuilder.compressionType() != CompressionType.NONE; + } + /** * A callback and the associated FutureRecordMetadata argument to pass to it. */ @@ -329,8 +346,9 @@ public final class ProducerBatch { return recordsBuilder.isFull(); } - public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence) { - recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, baseSequence); + public void setProducerState(ProducerIdAndEpoch producerIdAndEpoch, int baseSequence, boolean isTransactional) { + recordsBuilder.setProducerState(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, + baseSequence, isTransactional); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/3c3edc9d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 330c244..3f9f4b1 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -238,10 +238,7 @@ public final class RecordAccumulator { throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " + "support the required message format (v2). The broker must be version 0.11 or later."); } - boolean isTransactional = false; - if (transactionManager != null) - isTransactional = transactionManager.isInTransaction(); - return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L, isTransactional); + return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L); } /** @@ -470,11 +467,17 @@ public final class RecordAccumulator { break; } else { ProducerIdAndEpoch producerIdAndEpoch = null; + boolean isTransactional = false; if (transactionManager != null) { + if (!transactionManager.ensurePartitionAdded(tp)) + break; + producerIdAndEpoch = transactionManager.producerIdAndEpoch(); if (!producerIdAndEpoch.isValid()) // we cannot send the batch until we have refreshed the producer id break; + + isTransactional = transactionManager.isInTransaction(); } ProducerBatch batch = deque.pollFirst(); @@ -488,7 +491,7 @@ public final class RecordAccumulator { log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}", node, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, batch.topicPartition, sequenceNumber); - batch.setProducerState(producerIdAndEpoch, sequenceNumber); + batch.setProducerState(producerIdAndEpoch, sequenceNumber, isTransactional); } batch.close(); size += batch.sizeInBytes(); http://git-wip-us.apache.org/repos/asf/kafka/blob/3c3edc9d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index f498f7d..01ff91a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -47,6 +47,7 @@ import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; @@ -499,7 +500,8 @@ public class Sender implements Runnable { private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now) { Errors error = response.error; - if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1) { + if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && + (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) { // If the batch is too large, we split the batch and send the split batches again. We do not decrement // the retry attempts in this case. log.warn("Got error produce response in correlation id {} on topic-partition {}, spitting and retrying ({} attempts left). Error: {}", http://git-wip-us.apache.org/repos/asf/kafka/blob/3c3edc9d/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 30fff86..221816c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -232,6 +232,15 @@ public class TransactionManager { return lastError; } + public synchronized boolean ensurePartitionAdded(TopicPartition tp) { + if (isInTransaction() && !partitionsInTransaction.contains(tp)) { + transitionToFatalError(new IllegalStateException("Attempted to dequeue a record batch to send " + + "for partition " + tp + ", which hasn't been added to the transaction yet")); + return false; + } + return true; + } + public String transactionalId() { return transactionalId; } http://git-wip-us.apache.org/repos/asf/kafka/blob/3c3edc9d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 1d45635..46798cf 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -359,14 +359,6 @@ public class MemoryRecords extends AbstractRecords { byte magic, CompressionType compressionType, TimestampType timestampType, - long baseOffset) { - return builder(buffer, magic, compressionType, timestampType, baseOffset, false); - } - - public static MemoryRecordsBuilder builder(ByteBuffer buffer, - byte magic, - CompressionType compressionType, - TimestampType timestampType, long baseOffset, long logAppendTime) { return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, @@ -378,13 +370,12 @@ public class MemoryRecords extends AbstractRecords { byte magic, CompressionType compressionType, TimestampType timestampType, - long baseOffset, - boolean isTransactional) { + long baseOffset) { long logAppendTime = RecordBatch.NO_TIMESTAMP; if (timestampType == TimestampType.LOG_APPEND_TIME) logAppendTime = System.currentTimeMillis(); return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, - RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, isTransactional, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH); } http://git-wip-us.apache.org/repos/asf/kafka/blob/3c3edc9d/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 66560ca..89d314d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -52,7 +52,6 @@ public class MemoryRecordsBuilder { private final int initialPosition; private final long baseOffset; private final long logAppendTime; - private final boolean isTransactional; private final boolean isControlBatch; private final int partitionLeaderEpoch; private final int writeLimit; @@ -60,6 +59,7 @@ public class MemoryRecordsBuilder { private volatile float estimatedCompressionRatio; private boolean appendStreamIsClosed = false; + private boolean isTransactional; private long producerId; private short producerEpoch; private int baseSequence; @@ -196,7 +196,7 @@ public class MemoryRecordsBuilder { */ public MemoryRecords build() { if (aborted) { - throw new KafkaException("Attempting to build an aborted record batch"); + throw new IllegalStateException("Attempting to build an aborted record batch"); } close(); return builtRecords; @@ -235,7 +235,7 @@ public class MemoryRecordsBuilder { } } - public void setProducerState(long producerId, short producerEpoch, int baseSequence) { + public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) { if (isClosed()) { // Sequence numbers are assigned when the batch is closed while the accumulator is being drained. // If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will @@ -246,6 +246,7 @@ public class MemoryRecordsBuilder { this.producerId = producerId; this.producerEpoch = producerEpoch; this.baseSequence = baseSequence; + this.isTransactional = isTransactional; } public void overrideLastOffset(long lastOffset) { http://git-wip-us.apache.org/repos/asf/kafka/blob/3c3edc9d/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java index da93015..6d2d2f7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java @@ -28,7 +28,11 @@ import org.junit.Test; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Deque; +import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0; +import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1; +import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -52,9 +56,9 @@ public class ProducerBatchTest { @Test public void testAppendedChecksumMagicV0AndV1() { - for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1)) { + for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1)) { MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(128), magic, - CompressionType.NONE, TimestampType.CREATE_TIME, 128); + CompressionType.NONE, TimestampType.CREATE_TIME, 0L); ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now); byte[] key = "hi".getBytes(); byte[] value = "there".getBytes(); @@ -67,6 +71,41 @@ public class ProducerBatchTest { } } + @Test + public void testSplitPreservesMagicAndCompressionType() { + for (byte magic : Arrays.asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1, MAGIC_VALUE_V2)) { + for (CompressionType compressionType : CompressionType.values()) { + if (compressionType == CompressionType.NONE && magic < MAGIC_VALUE_V2) + continue; + + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, + compressionType, TimestampType.CREATE_TIME, 0L); + + ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now); + while (true) { + FutureRecordMetadata future = batch.tryAppend(now, "hi".getBytes(), "there".getBytes(), + Record.EMPTY_HEADERS, null, now); + if (future == null) + break; + } + + Deque<ProducerBatch> batches = batch.split(512); + assertTrue(batches.size() >= 2); + + for (ProducerBatch splitProducerBatch : batches) { + assertEquals(magic, splitProducerBatch.magic()); + assertTrue(splitProducerBatch.isSplitBatch()); + + for (RecordBatch splitBatch : splitProducerBatch.records().batches()) { + assertEquals(magic, splitBatch.magic()); + assertEquals(0L, splitBatch.baseOffset()); + assertEquals(compressionType, splitBatch.compressionType()); + } + } + } + } + } + /** * A {@link ProducerBatch} configured using a very large linger value and a timestamp preceding its create * time is interpreted correctly as not expired when the linger time is larger than the difference http://git-wip-us.apache.org/repos/asf/kafka/blob/3c3edc9d/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index e079f2a..2875ba2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -584,7 +584,7 @@ public class RecordAccumulatorTest { assertNotNull(future1); assertNotNull(future2); batch.close(); - // Enqueue the batch to the accumulator so that as if the batch was created by the accumulator. + // Enqueue the batch to the accumulator as if the batch was created by the accumulator. accum.reenqueue(batch, now); time.sleep(101L); // Drain the batch. http://git-wip-us.apache.org/repos/asf/kafka/blob/3c3edc9d/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index c1c5a2e..927a937 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -42,7 +42,9 @@ import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AddPartitionsToTxnResponse; import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.ProduceRequest; @@ -96,6 +98,7 @@ public class SenderTest { @Before public void setup() { + client.setNode(cluster.nodes().get(0)); setupWithTransactionState(null); } @@ -548,17 +551,41 @@ public class SenderTest { } @Test - public void testSplitBatchAndSend() throws Exception { + public void testIdempotentSplitBatchAndSend() throws Exception { + TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1); + TransactionManager txnManager = new TransactionManager(); + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); + txnManager.setProducerIdAndEpoch(producerIdAndEpoch); + testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp); + } + + @Test + public void testTransactionalSplitBatchAndSend() throws Exception { + ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); + TopicPartition tp = new TopicPartition("testSplitBatchAndSend", 1); + TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 60000); + + setupWithTransactionState(txnManager); + doInitTransactions(txnManager, producerIdAndEpoch); + + txnManager.beginTransaction(); + txnManager.maybeAddPartitionToTransaction(tp); + client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE))); + sender.run(time.milliseconds()); + + testSplitBatchAndSend(txnManager, producerIdAndEpoch, tp); + } + + private void testSplitBatchAndSend(TransactionManager txnManager, + ProducerIdAndEpoch producerIdAndEpoch, + TopicPartition tp) throws Exception { int maxRetries = 1; - String topic = "testSplitBatchAndSend"; + String topic = tp.topic(); // Set a good compression ratio. CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f); Metrics m = new Metrics(); - TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 0); - ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0); - txnManager.setProducerIdAndEpoch(producerIdAndEpoch); accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time, - new ApiVersions(), txnManager); + new ApiVersions(), txnManager); try { Sender sender = new Sender(client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, maxRetries, m, time, REQUEST_TIMEOUT, 1000L, txnManager, new ApiVersions()); @@ -566,14 +593,13 @@ public class SenderTest { Cluster cluster1 = TestUtils.clusterWith(2, topic, 2); metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds()); // Send the first message. - final TopicPartition tp2 = new TopicPartition(topic, 1); Future<RecordMetadata> f1 = - accumulator.append(tp2, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future; + accumulator.append(tp, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future; Future<RecordMetadata> f2 = - accumulator.append(tp2, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future; + accumulator.append(tp, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); // connect sender.run(time.milliseconds()); // send produce request - assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp2).longValue()); + assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp).longValue()); String id = client.requests().peek().destination(); assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey()); Node node = new Node(Integer.valueOf(id), "localhost", 0); @@ -581,14 +607,14 @@ public class SenderTest { assertTrue("Client ready status should be true", client.isReady(node, 0L)); Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>(); - responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE)); + responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE)); client.respond(new ProduceResponse(responseMap)); sender.run(time.milliseconds()); // split and reenqueue // The compression ratio should have been improved once. assertEquals(CompressionType.GZIP.rate - CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP, CompressionRatioEstimator.estimation(topic, CompressionType.GZIP), 0.01); sender.run(time.milliseconds()); // send produce request - assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp2).longValue()); + assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp).longValue()); assertFalse("The future shouldn't have been done.", f1.isDone()); assertFalse("The future shouldn't have been done.", f2.isDone()); id = client.requests().peek().destination(); @@ -597,12 +623,13 @@ public class SenderTest { assertEquals(1, client.inFlightRequestCount()); assertTrue("Client ready status should be true", client.isReady(node, 0L)); - responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L)); - client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 0, false), new ProduceResponse(responseMap)); + responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L)); + client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 0, txnManager.isInTransaction()), + new ProduceResponse(responseMap)); sender.run(time.milliseconds()); // receive assertTrue("The future should have been done.", f1.isDone()); - assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp2).longValue()); + assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp).longValue()); assertFalse("The future shouldn't have been done.", f2.isDone()); assertEquals("Offset of the first message should be 0", 0L, f1.get().offset()); sender.run(time.milliseconds()); // send produce request @@ -612,14 +639,15 @@ public class SenderTest { assertEquals(1, client.inFlightRequestCount()); assertTrue("Client ready status should be true", client.isReady(node, 0L)); - responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L)); - client.respond(produceRequestMatcher(tp2, producerIdAndEpoch, 1, false), new ProduceResponse(responseMap)); + responseMap.put(tp, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L)); + client.respond(produceRequestMatcher(tp, producerIdAndEpoch, 1, txnManager.isInTransaction()), + new ProduceResponse(responseMap)); sender.run(time.milliseconds()); // receive assertTrue("The future should have been done.", f2.isDone()); - assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp2).longValue()); + assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp).longValue()); assertEquals("Offset of the first message should be 1", 1L, f2.get().offset()); - assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp2).isEmpty()); + assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp).isEmpty()); assertTrue("There should be a split", m.metrics().get(m.metricName("batch-split-rate", "producer-metrics")).value() > 0); @@ -713,4 +741,23 @@ public class SenderTest { sender.run(time.milliseconds()); } + private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) { + transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE); + sender.run(time.milliseconds()); + sender.run(time.milliseconds()); + + prepareInitPidResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch); + sender.run(time.milliseconds()); + assertTrue(transactionManager.hasProducerId()); + } + + private void prepareFindCoordinatorResponse(Errors error) { + client.prepareResponse(new FindCoordinatorResponse(error, cluster.nodes().get(0))); + } + + private void prepareInitPidResponse(Errors error, long pid, short epoch) { + client.prepareResponse(new InitProducerIdResponse(0, error, pid, epoch)); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/3c3edc9d/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index c621d53..9734f59 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.record; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.Test; @@ -491,18 +490,18 @@ public class MemoryRecordsBuilderTest { } @Test - public void shouldThrowKafkaExceptionOnBuildWhenAborted() throws Exception { + public void shouldThrowIllegalStateExceptionOnBuildWhenAborted() throws Exception { ByteBuffer buffer = ByteBuffer.allocate(128); buffer.position(bufferOffset); MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, - TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, - false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, + RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); builder.abort(); try { builder.build(); fail("Should have thrown KafkaException"); - } catch (KafkaException e) { + } catch (IllegalStateException e) { // ok } }
