Repository: kafka Updated Branches: refs/heads/trunk 1c7fdd284 -> 7fad45557
KAFKA-3995; KIP-126 Allow KafkaProducer to split and resend oversized batches Author: Jiangjie Qin <[email protected]> Reviewers: Joel Koshy <[email protected]>, Ismael Juma <[email protected]> Closes #2638 from becketqin/KAFKA-3995 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7fad4555 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7fad4555 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7fad4555 Branch: refs/heads/trunk Commit: 7fad45557e4cb7b345f34cec32f910b437c59bc2 Parents: 1c7fdd2 Author: Jiangjie Qin <[email protected]> Authored: Sun May 21 17:31:31 2017 -0700 Committer: Jiangjie Qin <[email protected]> Committed: Sun May 21 17:31:31 2017 -0700 ---------------------------------------------------------------------- checkstyle/checkstyle.xml | 2 +- .../internals/FutureRecordMetadata.java | 34 +++- .../producer/internals/ProducerBatch.java | 142 ++++++++++++- .../producer/internals/RecordAccumulator.java | 37 +++- .../clients/producer/internals/Sender.java | 29 ++- .../kafka/common/record/AbstractRecords.java | 4 + .../record/CompressionRatioEstimator.java | 111 ++++++++++ .../kafka/common/record/CompressionType.java | 6 +- .../kafka/common/record/DefaultRecord.java | 8 +- .../kafka/common/record/DefaultRecordBatch.java | 8 + .../common/record/MemoryRecordsBuilder.java | 78 ++++--- .../internals/RecordAccumulatorTest.java | 203 ++++++++++++++++++- .../clients/producer/internals/SenderTest.java | 90 ++++++++ .../common/record/MemoryRecordsBuilderTest.java | 9 +- 14 files changed, 701 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index 9f9e9ae..cf57a50 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -105,7 +105,7 @@ </module> <module name="ClassDataAbstractionCoupling"> <!-- default is 7 --> - <property name="max" value="15"/> + <property name="max" value="17"/> </module> <module name="BooleanExpressionComplexity"> <!-- default is 3 --> http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java index f8b38e8..1de965f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java @@ -34,6 +34,7 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> { private final long checksum; private final int serializedKeySize; private final int serializedValueSize; + private volatile FutureRecordMetadata nextRecordMetadata = null; public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp, long checksum, int serializedKeySize, int serializedValueSize) { @@ -58,25 +59,54 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> { @Override public RecordMetadata get() throws InterruptedException, ExecutionException { this.result.await(); + if (nextRecordMetadata != null) + return nextRecordMetadata.get(); return valueOrError(); } @Override public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + // Handle overflow. + long now = System.currentTimeMillis(); + long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout; boolean occurred = this.result.await(timeout, unit); + if (nextRecordMetadata != null) + return nextRecordMetadata.get(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (!occurred) throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms."); return valueOrError(); } + /** + * This method is used when we have to split a large batch in smaller ones. A chained metadata will allow the + * future that has already returned to the users to wait on the newly created split batches even after the + * old big batch has been deemed as done. + */ + void chain(FutureRecordMetadata futureRecordMetadata) { + if (nextRecordMetadata == null) + nextRecordMetadata = futureRecordMetadata; + else + nextRecordMetadata.chain(futureRecordMetadata); + } + RecordMetadata valueOrError() throws ExecutionException { if (this.result.error() != null) throw new ExecutionException(this.result.error()); else return value(); } - + + long checksum() { + return this.checksum; + } + + long relativeOffset() { + return this.relativeOffset; + } + RecordMetadata value() { + if (nextRecordMetadata != null) + return nextRecordMetadata.value(); return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset, timestamp(), this.checksum, this.serializedKeySize, this.serializedValueSize); } @@ -87,6 +117,8 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> { @Override public boolean isDone() { + if (nextRecordMetadata != null) + return nextRecordMetadata.isDone(); return this.result.completed(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index 1c078c8..cdf85ce 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -16,15 +16,26 @@ */ package org.apache.kafka.clients.producer.internals; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Iterator; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordBatchTooLargeException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.record.AbstractRecords; +import org.apache.kafka.common.record.CompressionRatioEstimator; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.ProduceResponse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +45,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; +import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP; + + /** * A batch of records that is or will be sent. * @@ -51,6 +65,7 @@ public final class ProducerBatch { private final MemoryRecordsBuilder recordsBuilder; private final AtomicInteger attempts = new AtomicInteger(0); + private final boolean isSplitBatch; int recordCount; int maxRecordSize; private long lastAttemptMs; @@ -61,6 +76,10 @@ public final class ProducerBatch { private boolean retry; public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now) { + this(tp, recordsBuilder, now, false); + } + + public ProducerBatch(TopicPartition tp, MemoryRecordsBuilder recordsBuilder, long now, boolean isSplitBatch) { this.createdMs = now; this.lastAttemptMs = now; this.recordsBuilder = recordsBuilder; @@ -69,6 +88,10 @@ public final class ProducerBatch { this.produceFuture = new ProduceRequestResult(topicPartition); this.completed = new AtomicBoolean(); this.retry = false; + this.isSplitBatch = isSplitBatch; + float compressionRatioEstimation = CompressionRatioEstimator.estimation(topicPartition.topic(), + recordsBuilder.compressionType()); + recordsBuilder.setEstimatedCompressionRatio(compressionRatioEstimation); } /** @@ -87,14 +110,39 @@ public final class ProducerBatch { timestamp, checksum, key == null ? -1 : key.length, value == null ? -1 : value.length); - if (callback != null) - thunks.add(new Thunk(callback, future)); + // we have to keep every future returned to the users in case the batch needs to be + // split to several new batches and resent. + thunks.add(new Thunk(callback, future)); this.recordCount++; return future; } } /** + + * This method is only used by {@link #split(int)} when splitting a large batch to smaller ones. + + * @return true if the record has been successfully appended, false otherwise. + + */ + private boolean tryAppendForSplit(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers, Thunk thunk) { + if (!recordsBuilder.hasRoomFor(timestamp, key, value)) { + return false; + } else { + // No need to get the CRC. + this.recordsBuilder.append(timestamp, key, value); + this.maxRecordSize = Math.max(this.maxRecordSize, + AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers)); + FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, + timestamp, thunk.future.checksum(), + key == null ? -1 : key.remaining(), + value == null ? -1 : value.remaining()); + // Chain the future to the original thunk. + thunk.future.chain(future); + this.thunks.add(thunk); + this.recordCount++; + return true; + } + } + + /** * Complete the request. * * @param baseOffset The base offset of the messages assigned by the server @@ -116,9 +164,11 @@ public final class ProducerBatch { try { if (exception == null) { RecordMetadata metadata = thunk.future.value(); - thunk.callback.onCompletion(metadata, null); + if (thunk.callback != null) + thunk.callback.onCompletion(metadata, null); } else { - thunk.callback.onCompletion(null, exception); + if (thunk.callback != null) + thunk.callback.onCompletion(null, exception); } } catch (Exception e) { log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e); @@ -128,6 +178,71 @@ public final class ProducerBatch { produceFuture.done(); } + public Deque<ProducerBatch> split(int splitBatchSize) { + Deque<ProducerBatch> batches = new ArrayDeque<>(); + MemoryRecords memoryRecords = recordsBuilder.build(); + Iterator<MutableRecordBatch> recordBatchIter = memoryRecords.batches().iterator(); + if (!recordBatchIter.hasNext()) + throw new IllegalStateException("Cannot split an empty producer batch."); + RecordBatch recordBatch = recordBatchIter.next(); + if (recordBatchIter.hasNext()) + throw new IllegalStateException("A producer batch should only have one record batch."); + + Iterator<Thunk> thunkIter = thunks.iterator(); + // We always allocate batch size because we are already splitting a big batch. + // And we also Retain the create time of the original batch. + ProducerBatch batch = null; + for (Record record : recordBatch) { + assert thunkIter.hasNext(); + Thunk thunk = thunkIter.next(); + if (batch == null) { + batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(), + record, splitBatchSize, this.createdMs); + } + + // A newly created batch can always host the first message. + if (!batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk)) { + batches.add(batch); + batch = createBatchOffAccumulatorForRecord(this.topicPartition, this.recordsBuilder.compressionType(), + record, splitBatchSize, this.createdMs); + batch.tryAppendForSplit(record.timestamp(), record.key(), record.value(), record.headers(), thunk); + } + } + // Close the last batch and add it to the batch list after split. + if (batch != null) + batches.add(batch); + + produceFuture.set(ProduceResponse.INVALID_OFFSET, NO_TIMESTAMP, new RecordBatchTooLargeException()); + produceFuture.done(); + return batches; + } + + private ProducerBatch createBatchOffAccumulatorForRecord(TopicPartition tp, + CompressionType compressionType, + Record record, + int batchSize, + long createdMs) { + int initialSize = Math.max(Records.LOG_OVERHEAD + AbstractRecords.sizeInBytesUpperBound(magic(), + record.key(), + record.value(), + record.headers()), + batchSize); + return createBatchOffAccumulator(tp, compressionType, initialSize, createdMs); + } + + // package private for testing purpose. + static ProducerBatch createBatchOffAccumulator(TopicPartition tp, + CompressionType compressionType, + int batchSize, + long createdMs) { + ByteBuffer buffer = ByteBuffer.allocate(batchSize); + MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, + compressionType, + TimestampType.CREATE_TIME, + batchSize); + return new ProducerBatch(tp, builder, createdMs, true); + } + /** * A callback and the associated FutureRecordMetadata argument to pass to it. */ @@ -135,7 +250,7 @@ public final class ProducerBatch { final Callback callback; final FutureRecordMetadata future; - public Thunk(Callback callback, FutureRecordMetadata future) { + Thunk(Callback callback, FutureRecordMetadata future) { this.callback = callback; this.future = future; } @@ -155,7 +270,7 @@ public final class ProducerBatch { * This methods closes this batch and sets {@code expiryErrorMessage} if the batch has timed out. * {@link #expirationDone()} must be invoked to complete the produce future and invoke callbacks. */ - public boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) { + boolean maybeExpire(int requestTimeoutMs, long retryBackoffMs, long now, long lingerMs, boolean isFull) { if (!this.inRetry() && isFull && requestTimeoutMs < (now - this.lastAppendTime)) expiryErrorMessage = (now - this.lastAppendTime) + " ms has passed since last append"; else if (!this.inRetry() && requestTimeoutMs < (createdTimeMs(now) - lingerMs)) @@ -177,7 +292,7 @@ public final class ProducerBatch { void expirationDone() { if (expiryErrorMessage == null) throw new IllegalStateException("Batch has not expired"); - this.done(-1L, RecordBatch.NO_TIMESTAMP, + this.done(-1L, NO_TIMESTAMP, new TimeoutException("Expiring " + recordCount + " record(s) for " + topicPartition + ": " + expiryErrorMessage)); } @@ -208,6 +323,10 @@ public final class ProducerBatch { this.drainedMs = Math.max(drainedMs, nowMs); } + boolean isSplitBatch() { + return isSplitBatch; + } + /** * Returns if the batch is been retried for sending to kafka */ @@ -223,8 +342,8 @@ public final class ProducerBatch { return recordsBuilder.sizeInBytes(); } - public double compressionRate() { - return recordsBuilder.compressionRate(); + public double compressionRatio() { + return recordsBuilder.compressionRatio(); } public boolean isFull() { @@ -245,6 +364,11 @@ public final class ProducerBatch { public void close() { recordsBuilder.close(); + if (!recordsBuilder.isControlBatch()) { + CompressionRatioEstimator.updateEstimation(topicPartition.topic(), + recordsBuilder.compressionType(), + (float) recordsBuilder.compressionRatio()); + } } public void abort() { http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index 5b8fb96..e1f04a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.metrics.stats.Rate; import org.apache.kafka.common.record.AbstractRecords; +import org.apache.kafka.common.record.CompressionRatioEstimator; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.RecordBatch; @@ -325,6 +326,30 @@ public final class RecordAccumulator { } /** + * Split the big batch that has been rejected and reenqueue the split batches in to the accumulator. + * @return the number of split batches. + */ + public int splitAndReenqueue(ProducerBatch bigBatch) { + // Reset the estimated compression ratio to the initial value or the big batch compression ratio, whichever + // is bigger. There are several different ways to do the reset. We chose the most conservative one to ensure + // the split doesn't happen too often. + CompressionRatioEstimator.setEstimation(bigBatch.topicPartition.topic(), compression, + Math.max(1.0f, (float) bigBatch.compressionRatio())); + Deque<ProducerBatch> dq = bigBatch.split(this.batchSize); + int numSplitBatches = dq.size(); + Deque<ProducerBatch> partitionDequeue = getOrCreateDeque(bigBatch.topicPartition); + while (!dq.isEmpty()) { + ProducerBatch batch = dq.pollLast(); + incomplete.add(batch); + // We treat the newly split batches as if they are not even tried. + synchronized (partitionDequeue) { + partitionDequeue.addFirst(batch); + } + } + return numSplitBatches; + } + + /** * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated * partition batches. @@ -506,7 +531,17 @@ public final class RecordAccumulator { */ public void deallocate(ProducerBatch batch) { incomplete.remove(batch); - free.deallocate(batch.buffer(), batch.initialCapacity()); + // Only deallocate the batch if it is not a split batch because split batch are allocated aside the + // buffer pool. + if (!batch.isSplitBatch()) + free.deallocate(batch.buffer(), batch.initialCapacity()); + } + + /** + * Package private for unit test. Get the buffer pool remaining size in bytes. + */ + long bufferPoolAvailableMemory() { + return free.availableMemory(); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 8dea9c6..4c3b99d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -185,7 +185,7 @@ public class Sender implements Runnable { /** * Run a single iteration of sending - * + * * @param now The current POSIX time in milliseconds */ void run(long now) { @@ -478,7 +478,7 @@ public class Sender implements Runnable { /** * Complete or retry the given batch of records. - * + * * @param batch The record batch * @param response The produce response * @param correlationId The correlation id for the request @@ -487,7 +487,18 @@ public class Sender implements Runnable { private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId, long now) { Errors error = response.error; - if (error != Errors.NONE) { + if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1) { + // If the batch is too large, we split the batch and send the split batches again. We do not decrement + // the retry attempts in this case. + log.warn("Got error produce response in correlation id {} on topic-partition {}, spitting and retrying ({} attempts left). Error: {}", + correlationId, + batch.topicPartition, + this.retries - batch.attempts(), + error); + this.accumulator.splitAndReenqueue(batch); + this.accumulator.deallocate(batch); + this.sensors.recordBatchSplit(); + } else if (error != Errors.NONE) { if (canRetry(batch, error)) { log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}", correlationId, @@ -656,6 +667,7 @@ public class Sender implements Runnable { public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; public final Sensor produceThrottleTimeSensor; + public final Sensor batchSplitSensor; public SenderMetrics(Metrics metrics) { this.metrics = metrics; @@ -721,6 +733,10 @@ public class Sender implements Runnable { return (now - metadata.lastSuccessfulUpdate()) / 1000.0; } }); + + this.batchSplitSensor = metrics.sensor("batch-split-rate"); + m = metrics.metricName("batch-split-rate", metricGrpName, "The rate of record batch split"); + this.batchSplitSensor.add(m, new Rate()); } private void maybeRegisterTopicMetrics(String topic) { @@ -780,12 +796,12 @@ public class Sender implements Runnable { // per-topic compression rate String topicCompressionRateName = "topic." + topic + ".compression-rate"; Sensor topicCompressionRate = Utils.notNull(this.metrics.getSensor(topicCompressionRateName)); - topicCompressionRate.record(batch.compressionRate()); + topicCompressionRate.record(batch.compressionRatio()); // global metrics this.batchSizeSensor.record(batch.sizeInBytes(), now); this.queueTimeSensor.record(batch.queueTimeMs(), now); - this.compressionRateSensor.record(batch.compressionRate()); + this.compressionRateSensor.record(batch.compressionRatio()); this.maxRecordSizeSensor.record(batch.maxRecordSize, now); records += batch.recordCount; } @@ -826,6 +842,9 @@ public class Sender implements Runnable { this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds()); } + void recordBatchSplit() { + this.batchSplitSensor.record(); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index cfda8a4..2771ab7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -165,6 +165,10 @@ public abstract class AbstractRecords implements Records { } public static int sizeInBytesUpperBound(byte magic, byte[] key, byte[] value, Header[] headers) { + return sizeInBytesUpperBound(magic, Utils.wrapNullable(key), Utils.wrapNullable(value), headers); + } + + public static int sizeInBytesUpperBound(byte magic, ByteBuffer key, ByteBuffer value, Header[] headers) { if (magic >= RecordBatch.MAGIC_VALUE_V2) return DefaultRecordBatch.batchSizeUpperBound(key, value, headers); else http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java new file mode 100644 index 0000000..7f11784 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.record; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + + +/** + * This class help estimate the compression ratio for each topic and compression type combination. + */ +public class CompressionRatioEstimator { + // The constant speed to increase compression ratio when a batch compresses better than expected. + public static final float COMPRESSION_RATIO_IMPROVING_STEP = 0.005f; + // The minimum speed to decrease compression ratio when a batch compresses worse than expected. + public static final float COMPRESSION_RATIO_DETERIORATE_STEP = 0.05f; + private static final ConcurrentMap<String, float[]> COMPRESSION_RATIO = new ConcurrentHashMap<>(); + + /** + * Update the compression ratio estimation for a topic and compression type. + * + * @param topic the topic to update compression ratio estimation. + * @param type the compression type. + * @param observedRatio the observed compression ratio. + * @return the compression ratio estimation after the update. + */ + public static float updateEstimation(String topic, CompressionType type, float observedRatio) { + float[] compressionRatioForTopic = getAndCreateEstimationIfAbsent(topic); + float currentEstimation = compressionRatioForTopic[type.id]; + synchronized (compressionRatioForTopic) { + if (observedRatio > currentEstimation) + compressionRatioForTopic[type.id] = Math.max(currentEstimation + COMPRESSION_RATIO_DETERIORATE_STEP, observedRatio); + else if (observedRatio < currentEstimation) { + compressionRatioForTopic[type.id] = currentEstimation - COMPRESSION_RATIO_IMPROVING_STEP; + } + } + return compressionRatioForTopic[type.id]; + } + + /** + * Get the compression ratio estimation for a topic and compression type. + */ + public static float estimation(String topic, CompressionType type) { + float[] compressionRatioForTopic = getAndCreateEstimationIfAbsent(topic); + return compressionRatioForTopic[type.id]; + } + + /** + * Reset the compression ratio estimation to the initial values for a topic. + */ + public static void resetEstimation(String topic) { + float[] compressionRatioForTopic = getAndCreateEstimationIfAbsent(topic); + synchronized (compressionRatioForTopic) { + for (CompressionType type : CompressionType.values()) { + compressionRatioForTopic[type.id] = type.rate; + } + } + } + + /** + * Remove the compression ratio estimation for a topic. + */ + public static void removeEstimation(String topic) { + COMPRESSION_RATIO.remove(topic); + } + + /** + * Set the compression estimation for a topic compression type combination. This method is for unit test purpose. + */ + public static void setEstimation(String topic, CompressionType type, float ratio) { + float[] compressionRatioForTopic = getAndCreateEstimationIfAbsent(topic); + synchronized (compressionRatioForTopic) { + compressionRatioForTopic[type.id] = ratio; + } + } + + private static float[] getAndCreateEstimationIfAbsent(String topic) { + float[] compressionRatioForTopic = COMPRESSION_RATIO.get(topic); + if (compressionRatioForTopic == null) { + compressionRatioForTopic = initialCompressionRatio(); + float[] existingCompressionRatio = COMPRESSION_RATIO.putIfAbsent(topic, compressionRatioForTopic); + // Someone created the compression ratio array before us, use it. + if (existingCompressionRatio != null) + return existingCompressionRatio; + } + return compressionRatioForTopic; + } + + private static float[] initialCompressionRatio() { + float[] compressionRatio = new float[CompressionType.values().length]; + for (CompressionType type : CompressionType.values()) { + compressionRatio[type.id] = type.rate; + } + return compressionRatio; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java index a78c5a2..15b5958 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/CompressionType.java @@ -44,7 +44,7 @@ public enum CompressionType { } }, - GZIP(1, "gzip", 0.5f) { + GZIP(1, "gzip", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) { try { @@ -64,7 +64,7 @@ public enum CompressionType { } }, - SNAPPY(2, "snappy", 0.5f) { + SNAPPY(2, "snappy", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) { try { @@ -84,7 +84,7 @@ public enum CompressionType { } }, - LZ4(3, "lz4", 0.5f) { + LZ4(3, "lz4", 1.0f) { @Override public OutputStream wrapForOutput(ByteBufferOutputStream buffer, byte messageVersion, int bufferSize) { try { http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java index 669c75d..37f92d2 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java @@ -494,8 +494,12 @@ public class DefaultRecord implements Record { } static int recordSizeUpperBound(byte[] key, byte[] value, Header[] headers) { - int keySize = key == null ? -1 : key.length; - int valueSize = value == null ? -1 : value.length; + return recordSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers); + } + + static int recordSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) { + int keySize = key == null ? -1 : key.remaining(); + int valueSize = value == null ? -1 : value.remaining(); return MAX_RECORD_OVERHEAD + sizeOf(keySize, valueSize, headers); } http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 74bd3c0..589e67c 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -30,6 +30,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import org.apache.kafka.common.utils.Utils; import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; @@ -440,6 +441,13 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe * Get an upper bound on the size of a batch with only a single record using a given key and value. */ static int batchSizeUpperBound(byte[] key, byte[] value, Header[] headers) { + return batchSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers); + } + + /** + * Get an upper bound on the size of a batch with only a single record using a given key and value. + */ + static int batchSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) { return RECORD_BATCH_OVERHEAD + DefaultRecord.recordSizeUpperBound(key, value, headers); } http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 6f90fac..42ae0f8 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -37,22 +37,9 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable; * This will release resources like compression buffers that can be relatively large (64 KB for LZ4). */ public class MemoryRecordsBuilder { - private static final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f; private static final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f; private static final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024; - private static final float[] TYPE_TO_RATE; - - static { - int maxTypeId = -1; - for (CompressionType type : CompressionType.values()) - maxTypeId = Math.max(maxTypeId, type.id); - TYPE_TO_RATE = new float[maxTypeId + 1]; - for (CompressionType type : CompressionType.values()) { - TYPE_TO_RATE[type.id] = type.rate; - } - } - private final TimestampType timestampType; private final CompressionType compressionType; // Used to append records, may compress data on the fly @@ -71,13 +58,15 @@ public class MemoryRecordsBuilder { private final int writeLimit; private final int initialCapacity; + private volatile float estimatedCompressionRatio; + private boolean appendStreamIsClosed = false; private long producerId; private short producerEpoch; private int baseSequence; private long writtenUncompressed = 0; private int numRecords = 0; - private float compressionRate = 1; + private float actualCompressionRatio = 1; private long maxTimestamp = RecordBatch.NO_TIMESTAMP; private long offsetOfMaxTimestamp = -1; private Long lastOffset = null; @@ -134,7 +123,7 @@ public class MemoryRecordsBuilder { this.initPos = buffer.position(); this.numRecords = 0; this.writtenUncompressed = 0; - this.compressionRate = 1; + this.actualCompressionRatio = 1; this.maxTimestamp = RecordBatch.NO_TIMESTAMP; this.producerId = producerId; this.producerEpoch = producerEpoch; @@ -167,8 +156,16 @@ public class MemoryRecordsBuilder { return initialCapacity; } - public double compressionRate() { - return compressionRate; + public double compressionRatio() { + return actualCompressionRatio; + } + + public CompressionType compressionType() { + return compressionType; + } + + public boolean isControlBatch() { + return isControlBatch; } /** @@ -284,9 +281,9 @@ public class MemoryRecordsBuilder { builtRecords = MemoryRecords.EMPTY; } else { if (magic > RecordBatch.MAGIC_VALUE_V1) - writeDefaultBatchHeader(); + this.actualCompressionRatio = (float) writeDefaultBatchHeader() / this.writtenUncompressed; else if (compressionType != CompressionType.NONE) - writeLegacyCompressedWrapperHeader(); + this.actualCompressionRatio = (float) writeLegacyCompressedWrapperHeader() / this.writtenUncompressed; ByteBuffer buffer = buffer().duplicate(); buffer.flip(); @@ -295,12 +292,17 @@ public class MemoryRecordsBuilder { } } - private void writeDefaultBatchHeader() { + /** + * Write the header to the default batch. + * @return the written compressed bytes. + */ + private int writeDefaultBatchHeader() { ensureOpenForRecordBatchWrite(); ByteBuffer buffer = bufferStream.buffer(); int pos = buffer.position(); buffer.position(initPos); int size = pos - initPos; + int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD; int offsetDelta = (int) (lastOffset - baseOffset); final long baseTimestamp; @@ -318,9 +320,14 @@ public class MemoryRecordsBuilder { partitionLeaderEpoch, numRecords); buffer.position(pos); + return writtenCompressed; } - private void writeLegacyCompressedWrapperHeader() { + /** + * Write the header to the legacy batch. + * @return the written compressed bytes. + */ + private int writeLegacyCompressedWrapperHeader() { ensureOpenForRecordBatchWrite(); ByteBuffer buffer = bufferStream.buffer(); int pos = buffer.position(); @@ -334,11 +341,7 @@ public class MemoryRecordsBuilder { LegacyRecord.writeCompressedRecordHeader(buffer, magic, wrapperSize, timestamp, compressionType, timestampType); buffer.position(pos); - - // update the compression ratio - this.compressionRate = (float) writtenCompressed / this.writtenUncompressed; - TYPE_TO_RATE[compressionType.id] = TYPE_TO_RATE[compressionType.id] * COMPRESSION_RATE_DAMPING_FACTOR + - compressionRate * (1 - COMPRESSION_RATE_DAMPING_FACTOR); + return writtenCompressed; } private long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, @@ -440,7 +443,7 @@ public class MemoryRecordsBuilder { public long append(long timestamp, ByteBuffer key, ByteBuffer value) { return append(timestamp, key, value, Record.EMPTY_HEADERS); } - + /** * Append a new record at the next sequential offset. * @param timestamp The record timestamp @@ -636,11 +639,25 @@ public class MemoryRecordsBuilder { return buffer().position(); } else { // estimate the written bytes to the underlying byte buffer based on uncompressed written bytes - return (int) (writtenUncompressed * TYPE_TO_RATE[compressionType.id] * COMPRESSION_RATE_ESTIMATION_FACTOR); + return (int) (writtenUncompressed * estimatedCompressionRatio * COMPRESSION_RATE_ESTIMATION_FACTOR); } } /** + * Set the estimated compression ratio for the memory records builder. + */ + public void setEstimatedCompressionRatio(float estimatedCompressionRatio) { + this.estimatedCompressionRatio = estimatedCompressionRatio; + } + + /** + * Check if we have room for a new record containing the given key/value pair + */ + public boolean hasRoomFor(long timestamp, byte[] key, byte[] value) { + return hasRoomFor(timestamp, wrapNullable(key), wrapNullable(value)); + } + + /** * Check if we have room for a new record containing the given key/value pair * * Note that the return value is based on the estimate of the bytes written to the compressor, which may not be @@ -652,7 +669,7 @@ public class MemoryRecordsBuilder { * the checking should be based on the capacity of the initialized buffer rather than the write limit in order * to accept this single record. */ - public boolean hasRoomFor(long timestamp, byte[] key, byte[] value) { + public boolean hasRoomFor(long timestamp, ByteBuffer key, ByteBuffer value) { if (isFull()) return false; @@ -662,9 +679,10 @@ public class MemoryRecordsBuilder { } else { int nextOffsetDelta = lastOffset == null ? 0 : (int) (lastOffset - baseOffset + 1); long timestampDelta = baseTimestamp == null ? 0 : timestamp - baseTimestamp; - recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value); + recordSize = DefaultRecord.sizeInBytes(nextOffsetDelta, timestampDelta, key, value, Record.EMPTY_HEADERS); } + // Be conservative and not take compression of the new record into consideration. return numRecords == 0 ? this.initialCapacity >= recordSize : this.writeLimit >= estimatedBytesWritten() + recordSize; http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index af599ca..b9675c3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.clients.producer.internals; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.producer.Callback; @@ -28,6 +31,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.CompressionRatioEstimator; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.DefaultRecord; @@ -55,6 +59,7 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -311,7 +316,7 @@ public class RecordAccumulatorTest { assertEquals("Node1 should only have one batch drained.", 1, batches.get(0).size()); assertEquals("Node1 should only have one batch for partition 0.", tp1, batches.get(0).get(0).topicPartition); } - + @Test public void testFlush() throws Exception { long lingerMs = Long.MAX_VALUE; @@ -321,16 +326,16 @@ public class RecordAccumulatorTest { accum.append(new TopicPartition(topic, i % 3), 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs); RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); - + accum.beginFlush(); result = accum.ready(cluster, time.milliseconds()); - + // drain and deallocate all batches Map<Integer, List<ProducerBatch>> results = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); for (List<ProducerBatch> batches: results.values()) for (ProducerBatch batch: batches) accum.deallocate(batch); - + // should be complete with no unsent records. accum.awaitFlushCompletion(); assertFalse(accum.hasUnsent()); @@ -552,6 +557,196 @@ public class RecordAccumulatorTest { accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, 0); } + @Test + public void testSplitAndReenqueue() throws ExecutionException, InterruptedException { + long now = time.milliseconds(); + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10, 100L, metrics, time, + new ApiVersions(), null); + // Create a big batch + ProducerBatch batch = ProducerBatch.createBatchOffAccumulator(tp1, CompressionType.NONE, 4096, now); + byte[] value = new byte[1024]; + final AtomicInteger acked = new AtomicInteger(0); + Callback cb = new Callback() { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + acked.incrementAndGet(); + } + }; + // Append two messages so the batch is too big. + Future<RecordMetadata> future1 = batch.tryAppend(now, null, value, Record.EMPTY_HEADERS, cb, now); + Future<RecordMetadata> future2 = batch.tryAppend(now, null, value, Record.EMPTY_HEADERS, cb, now); + assertNotNull(future1); + assertNotNull(future2); + batch.close(); + // Enqueue the batch to the accumulator so that as if the batch was created by the accumulator. + accum.reenqueue(batch, now); + time.sleep(101L); + // Drain the batch. + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertTrue("The batch should be ready", result.readyNodes.size() > 0); + Map<Integer, List<ProducerBatch>> drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals("Only node1 should be drained", 1, drained.size()); + assertEquals("Only one batch should be drained", 1, drained.get(node1.id()).size()); + // Split and reenqueue the batch. + accum.splitAndReenqueue(drained.get(node1.id()).get(0)); + time.sleep(101L); + + drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertFalse(drained.isEmpty()); + assertFalse(drained.get(node1.id()).isEmpty()); + drained.get(node1.id()).get(0).done(acked.get(), 100L, null); + assertEquals("The first message should have been acked.", 1, acked.get()); + assertTrue(future1.isDone()); + assertEquals(0, future1.get().offset()); + + drained = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertFalse(drained.isEmpty()); + assertFalse(drained.get(node1.id()).isEmpty()); + drained.get(node1.id()).get(0).done(acked.get(), 100L, null); + assertEquals("Both message should have been acked.", 2, acked.get()); + assertTrue(future2.isDone()); + assertEquals(1, future2.get().offset()); + } + + @Test + public void testSplitBatchOffAccumulator() throws InterruptedException { + long seed = System.currentTimeMillis(); + final int batchSize = 1024; + final int bufferCapacity = 3 * 1024; + + // First set the compression ratio estimation to be good. + CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f); + RecordAccumulator accum = new RecordAccumulator(batchSize, bufferCapacity, CompressionType.GZIP, 0L, 100L, + metrics, time, new ApiVersions(), null); + int numSplitBatches = prepareSplitBatches(accum, seed, 100, 20); + assertTrue("There should be some split batches", numSplitBatches > 0); + // Drain all the split batches. + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + for (int i = 0; i < numSplitBatches; i++) { + Map<Integer, List<ProducerBatch>> drained = + accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertFalse(drained.isEmpty()); + assertFalse(drained.get(node1.id()).isEmpty()); + } + assertTrue("All the batches should have been drained.", + accum.ready(cluster, time.milliseconds()).readyNodes.isEmpty()); + assertEquals("The split batches should be allocated off the accumulator", + bufferCapacity, accum.bufferPoolAvailableMemory()); + } + + @Test + public void testSplitFrequency() throws InterruptedException { + long seed = System.currentTimeMillis(); + Random random = new Random(); + random.setSeed(seed); + final int batchSize = 1024; + final int numMessages = 1000; + + RecordAccumulator accum = new RecordAccumulator(batchSize, 3 * 1024, CompressionType.GZIP, 10, 100L, + metrics, time, new ApiVersions(), null); + // Adjust the high and low compression ratio message percentage + for (int goodCompRatioPercentage = 1; goodCompRatioPercentage < 100; goodCompRatioPercentage++) { + int numSplit = 0; + int numBatches = 0; + CompressionRatioEstimator.resetEstimation(topic); + for (int i = 0; i < numMessages; i++) { + int dice = random.nextInt(100); + byte[] value = (dice < goodCompRatioPercentage) ? + bytesWithGoodCompression(random) : bytesWithPoorCompression(random, 100); + accum.append(tp1, 0L, null, value, Record.EMPTY_HEADERS, null, 0); + BatchDrainedResult result = completeOrSplitBatches(accum, batchSize); + numSplit += result.numSplit; + numBatches += result.numBatches; + } + time.sleep(10); + BatchDrainedResult result = completeOrSplitBatches(accum, batchSize); + numSplit += result.numSplit; + numBatches += result.numBatches; + assertTrue(String.format("Total num batches = %d, split batches = %d, more than 10%% of the batch splits. " + + "Random seed is " + seed, + numBatches, numSplit), (double) numSplit / numBatches < 0.1f); + } + } + + private int prepareSplitBatches(RecordAccumulator accum, long seed, int recordSize, int numRecords) + throws InterruptedException { + Random random = new Random(); + random.setSeed(seed); + + // First set the compression ratio estimation to be good. + CompressionRatioEstimator.setEstimation(tp1.topic(), CompressionType.GZIP, 0.1f); + // Append 20 records of 100 bytes size with poor compression ratio should make the batch too big. + for (int i = 0; i < numRecords; i++) { + accum.append(tp1, 0L, null, bytesWithPoorCompression(random, recordSize), Record.EMPTY_HEADERS, null, 0); + } + + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertFalse(result.readyNodes.isEmpty()); + Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + assertEquals(1, batches.size()); + assertEquals(1, batches.values().iterator().next().size()); + ProducerBatch batch = batches.values().iterator().next().get(0); + int numSplitBatches = accum.splitAndReenqueue(batch); + accum.deallocate(batch); + + return numSplitBatches; + } + + private BatchDrainedResult completeOrSplitBatches(RecordAccumulator accum, int batchSize) { + int numSplit = 0; + int numBatches = 0; + boolean batchDrained; + do { + batchDrained = false; + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + Map<Integer, List<ProducerBatch>> batches = accum.drain(cluster, result.readyNodes, Integer.MAX_VALUE, time.milliseconds()); + for (List<ProducerBatch> batchList : batches.values()) { + for (ProducerBatch batch : batchList) { + batchDrained = true; + numBatches++; + if (batch.sizeInBytes() > batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD) { + accum.splitAndReenqueue(batch); + // release the resource of the original big batch. + numSplit++; + } else { + batch.done(0L, 0L, null); + } + accum.deallocate(batch); + } + } + } while (batchDrained); + return new BatchDrainedResult(numSplit, numBatches); + } + + /** + * Generates the compression ratio at about 0.6 + */ + private byte[] bytesWithGoodCompression(Random random) { + byte[] value = new byte[100]; + ByteBuffer buffer = ByteBuffer.wrap(value); + while (buffer.remaining() > 0) + buffer.putInt(random.nextInt(1000)); + return value; + } + + /** + * Generates the compression ratio at about 0.9 + */ + private byte[] bytesWithPoorCompression(Random random, int size) { + byte[] value = new byte[size]; + random.nextBytes(value); + return value; + } + + private class BatchDrainedResult { + final int numSplit; + final int numBatches; + BatchDrainedResult(int numSplit, int numBatches) { + this.numBatches = numBatches; + this.numSplit = numSplit; + } + } + /** * Return the offset delta. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 1321fba..cc30f4d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.CompressionRatioEstimator; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.RecordBatch; @@ -531,6 +532,95 @@ public class SenderTest { assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionManager.hasProducerId()); } + @Test + public void testSplitBatchAndSend() throws Exception { + int maxRetries = 1; + String topic = "testSplitBatchAndSend"; + // Set a good compression ratio. + CompressionRatioEstimator.setEstimation(topic, CompressionType.GZIP, 0.2f); + Metrics m = new Metrics(); + TransactionManager txnManager = new TransactionManager("testSplitBatchAndSend", 0); + txnManager.setProducerIdAndEpoch(new ProducerIdAndEpoch(123456L, (short) 0)); + accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.GZIP, 0L, 0L, m, time, + new ApiVersions(), txnManager); + try { + Sender sender = new Sender(client, + metadata, + this.accumulator, + true, + MAX_REQUEST_SIZE, + ACKS_ALL, + maxRetries, + m, + time, + REQUEST_TIMEOUT, + 1000L, + txnManager, + new ApiVersions()); + // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 + Cluster cluster1 = TestUtils.clusterWith(2, topic, 2); + metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds()); + // Send the first message. + TopicPartition tp2 = new TopicPartition(topic, 1); + Future<RecordMetadata> f1 = + accumulator.append(tp2, 0L, "key1".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future; + Future<RecordMetadata> f2 = + accumulator.append(tp2, 0L, "key2".getBytes(), new byte[batchSize / 2], null, null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); // connect + sender.run(time.milliseconds()); // send produce request + assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp2).longValue()); + String id = client.requests().peek().destination(); + assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey()); + Node node = new Node(Integer.valueOf(id), "localhost", 0); + assertEquals(1, client.inFlightRequestCount()); + assertTrue("Client ready status should be true", client.isReady(node, 0L)); + + Map<TopicPartition, ProduceResponse.PartitionResponse> responseMap = new HashMap<>(); + responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE)); + client.respond(new ProduceResponse(responseMap)); + sender.run(time.milliseconds()); // split and reenqueue + // The compression ratio should have been improved once. + assertEquals(CompressionType.GZIP.rate - CompressionRatioEstimator.COMPRESSION_RATIO_IMPROVING_STEP, + CompressionRatioEstimator.estimation(topic, CompressionType.GZIP), 0.01); + sender.run(time.milliseconds()); // send produce request + assertEquals("The sequence number should be 0", 0, txnManager.sequenceNumber(tp2).longValue()); + assertFalse("The future shouldn't have been done.", f1.isDone()); + assertFalse("The future shouldn't have been done.", f2.isDone()); + id = client.requests().peek().destination(); + assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey()); + node = new Node(Integer.valueOf(id), "localhost", 0); + assertEquals(1, client.inFlightRequestCount()); + assertTrue("Client ready status should be true", client.isReady(node, 0L)); + + responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L)); + client.respond(new ProduceResponse(responseMap)); + sender.run(time.milliseconds()); // receive + assertTrue("The future should have been done.", f1.isDone()); + assertEquals("The sequence number should be 1", 1, txnManager.sequenceNumber(tp2).longValue()); + assertFalse("The future shouldn't have been done.", f2.isDone()); + assertEquals("Offset of the first message should be 0", 0L, f1.get().offset()); + sender.run(time.milliseconds()); // send produce request + id = client.requests().peek().destination(); + assertEquals(ApiKeys.PRODUCE, client.requests().peek().requestBuilder().apiKey()); + node = new Node(Integer.valueOf(id), "localhost", 0); + assertEquals(1, client.inFlightRequestCount()); + assertTrue("Client ready status should be true", client.isReady(node, 0L)); + + responseMap.put(tp2, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L)); + client.respond(new ProduceResponse(responseMap)); + sender.run(time.milliseconds()); // receive + assertTrue("The future should have been done.", f2.isDone()); + assertEquals("The sequence number should be 2", 2, txnManager.sequenceNumber(tp2).longValue()); + assertEquals("Offset of the first message should be 1", 1L, f2.get().offset()); + assertTrue("There should be no batch in the accumulator", accumulator.batches().get(tp2).isEmpty()); + + assertTrue("There should be a split", + m.metrics().get(m.metricName("batch-split-rate", "producer-metrics")).value() > 0); + } finally { + m.close(); + } + } + private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception { assertTrue("Request should be completed", future.isDone()); try { http://git-wip-us.apache.org/repos/asf/kafka/blob/7fad4555/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index c08a2f0..58d4371 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -223,11 +223,11 @@ public class MemoryRecordsBuilderTest { MemoryRecords built = builder.build(); if (compressionType == CompressionType.NONE) { - assertEquals(1.0, builder.compressionRate(), 0.00001); + assertEquals(1.0, builder.compressionRatio(), 0.00001); } else { int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V0; double computedCompressionRate = (double) compressedSize / uncompressedSize; - assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001); + assertEquals(computedCompressionRate, builder.compressionRatio(), 0.00001); } } @@ -254,11 +254,11 @@ public class MemoryRecordsBuilderTest { MemoryRecords built = builder.build(); if (compressionType == CompressionType.NONE) { - assertEquals(1.0, builder.compressionRate(), 0.00001); + assertEquals(1.0, builder.compressionRatio(), 0.00001); } else { int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V1; double computedCompressionRate = (double) compressedSize / uncompressedSize; - assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001); + assertEquals(computedCompressionRate, builder.compressionRatio(), 0.00001); } } @@ -359,6 +359,7 @@ public class MemoryRecordsBuilderTest { MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + builder.setEstimatedCompressionRatio(0.5f); builder.append(0L, "a".getBytes(), "1".getBytes()); builder.append(1L, "b".getBytes(), "2".getBytes());
