Repository: kafka Updated Branches: refs/heads/trunk fdcee8b8b -> cea319a4a
KAFKA-4935; Deprecate client checksum API and compute lazy partial checksum for magic v2 Author: Jason Gustafson <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Ismael Juma <[email protected]> Closes #3123 from hachikuji/KAFKA-4935 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cea319a4 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cea319a4 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cea319a4 Branch: refs/heads/trunk Commit: cea319a4ad9c55d3d3263cf7a4224c25772d0e11 Parents: fdcee8b Author: Jason Gustafson <[email protected]> Authored: Thu May 25 07:41:51 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Thu May 25 08:21:01 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/consumer/ConsumerRecord.java | 23 ++++-- .../clients/consumer/internals/Fetcher.java | 2 +- .../kafka/clients/producer/KafkaProducer.java | 4 +- .../kafka/clients/producer/MockProducer.java | 7 +- .../kafka/clients/producer/RecordMetadata.java | 43 +++++++---- .../internals/FutureRecordMetadata.java | 10 +-- .../producer/internals/ProducerBatch.java | 4 +- .../internals/ProducerInterceptors.java | 6 +- .../common/header/internals/RecordHeaders.java | 6 +- .../record/AbstractLegacyRecordBatch.java | 5 ++ .../kafka/common/record/DefaultRecord.java | 67 +++++++---------- .../kafka/common/record/DefaultRecordBatch.java | 3 +- .../common/record/MemoryRecordsBuilder.java | 71 ++++++++++-------- .../org/apache/kafka/common/record/Record.java | 5 +- .../clients/consumer/ConsumerRecordTest.java | 12 +++ .../clients/producer/RecordMetadataTest.java | 79 ++++++++++++++++++++ .../kafka/clients/producer/RecordSendTest.java | 6 +- .../producer/internals/ProducerBatchTest.java | 29 +++++++ .../internals/ProducerInterceptorsTest.java | 2 +- .../internals/RecordAccumulatorTest.java | 8 +- .../common/record/MemoryRecordsBuilderTest.java | 16 ++++ .../kafka/test/MockConsumerInterceptor.java | 3 +- .../scala/kafka/tools/DumpLogSegments.scala | 6 +- docs/upgrade.html | 2 +- 24 files changed, 285 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java index 464091a..7f85246 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.DefaultRecord; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; @@ -36,13 +37,14 @@ public class ConsumerRecord<K, V> { private final long offset; private final long timestamp; private final TimestampType timestampType; - private final long checksum; private final int serializedKeySize; private final int serializedValueSize; private final Headers headers; private final K key; private final V value; + private volatile Long checksum; + /** * Creates a record to be received from a specified topic and partition (provided for * compatibility with Kafka 0.9 before the message format supported timestamps and before @@ -63,7 +65,6 @@ public class ConsumerRecord<K, V> { NULL_CHECKSUM, NULL_SIZE, NULL_SIZE, key, value); } - /** * Creates a record to be received from a specified topic and partition (provided for * compatibility with Kafka 0.10 before the message format supported headers). @@ -89,7 +90,8 @@ public class ConsumerRecord<K, V> { int serializedValueSize, K key, V value) { - this(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize, key, value, new RecordHeaders()); + this(topic, partition, offset, timestamp, timestampType, checksum, serializedKeySize, serializedValueSize, + key, value, new RecordHeaders()); } /** @@ -112,7 +114,7 @@ public class ConsumerRecord<K, V> { long offset, long timestamp, TimestampType timestampType, - long checksum, + Long checksum, int serializedKeySize, int serializedValueSize, K key, @@ -191,8 +193,19 @@ public class ConsumerRecord<K, V> { /** * The checksum (CRC32) of the record. + * + * @deprecated As of Kafka 0.11.0. Because of the potential for message format conversion on the broker, the + * checksum returned by the broker may not match what was computed by the producer. + * It is therefore unsafe to depend on this checksum for end-to-end delivery guarantees. Additionally, + * message format v2 does not include a record-level checksum (for performance, the record checksum + * was replaced with a batch checksum). To maintain compatibility, a partial checksum computed from + * the record timestamp, serialized key size, and serialized value size is returned instead, but + * this should not be depended on for end-to-end reliability. */ + @Deprecated public long checksum() { + if (checksum == null) + this.checksum = DefaultRecord.computePartialChecksum(timestamp, serializedKeySize, serializedValueSize); return this.checksum; } @@ -215,7 +228,7 @@ public class ConsumerRecord<K, V> { @Override public String toString() { return "ConsumerRecord(topic = " + topic() + ", partition = " + partition() + ", offset = " + offset() - + ", " + timestampType + " = " + timestamp + ", checksum = " + checksum + + ", " + timestampType + " = " + timestamp + ", serialized key size = " + serializedKeySize + ", serialized value size = " + serializedValueSize + ", headers = " + headers http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 6917a1d..a79ea5d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -917,7 +917,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes); V value = valueBytes == null ? null : this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray); return new ConsumerRecord<>(partition.topic(), partition.partition(), offset, - timestamp, timestampType, record.checksum(), + timestamp, timestampType, record.checksumOrNull(), keyByteArray == null ? ConsumerRecord.NULL_SIZE : keyByteArray.length, valueByteArray == null ? ConsumerRecord.NULL_SIZE : valueByteArray.length, key, value, headers); http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index c11ecc7..22baf3c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -1014,8 +1014,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { public void onCompletion(RecordMetadata metadata, Exception exception) { if (this.interceptors != null) { if (metadata == null) { - this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1, -1), - exception); + this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, + Long.valueOf(-1L), -1, -1), exception); } else { this.interceptors.onAcknowledgement(metadata, exception); } http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index 22fa755..566e43a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -251,11 +251,10 @@ public class MockProducer<K, V> implements Producer<K, V> { partition = partition(record, this.cluster); TopicPartition topicPartition = new TopicPartition(record.topic(), partition); ProduceRequestResult result = new ProduceRequestResult(topicPartition); - FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, 0, 0, 0); + FutureRecordMetadata future = new FutureRecordMetadata(result, 0, RecordBatch.NO_TIMESTAMP, 0L, 0, 0); long offset = nextOffset(topicPartition); - Completion completion = new Completion(offset, - new RecordMetadata(topicPartition, 0, offset, RecordBatch.NO_TIMESTAMP, 0, 0, 0), - result, callback); + Completion completion = new Completion(offset, new RecordMetadata(topicPartition, 0, offset, + RecordBatch.NO_TIMESTAMP, Long.valueOf(0L), 0, 0), result, callback); if (!this.transactionInFlight) this.sent.add(record); http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java index 2d06ea8..6757a6d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java @@ -17,7 +17,7 @@ package org.apache.kafka.clients.producer; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.record.DefaultRecord; /** * The metadata for a record that has been acknowledged by the server @@ -36,15 +36,17 @@ public final class RecordMetadata { // user provided one. Otherwise, it will be the producer local time when the producer record was handed to the // producer. private final long timestamp; - private final long checksum; private final int serializedKeySize; private final int serializedValueSize; private final TopicPartition topicPartition; - private RecordMetadata(TopicPartition topicPartition, long offset, long timestamp, long - checksum, int serializedKeySize, int serializedValueSize) { - super(); - this.offset = offset; + private volatile Long checksum; + + public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp, + Long checksum, int serializedKeySize, int serializedValueSize) { + // ignore the relativeOffset if the base offset is -1, + // since this indicates the offset is unknown + this.offset = baseOffset == -1 ? baseOffset : baseOffset + relativeOffset; this.timestamp = timestamp; this.checksum = checksum; this.serializedKeySize = serializedKeySize; @@ -52,17 +54,14 @@ public final class RecordMetadata { this.topicPartition = topicPartition; } + /** + * @deprecated As of 0.11.0. Use @{@link RecordMetadata#RecordMetadata(TopicPartition, long, long, long, Long, int, int)}. + */ @Deprecated - public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset) { - this(topicPartition, baseOffset, relativeOffset, RecordBatch.NO_TIMESTAMP, -1, -1, -1); - } - - public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, - long timestamp, long checksum, int serializedKeySize, int serializedValueSize) { - // ignore the relativeOffset if the base offset is -1, - // since this indicates the offset is unknown - this(topicPartition, baseOffset == -1 ? baseOffset : baseOffset + relativeOffset, - timestamp, checksum, serializedKeySize, serializedValueSize); + public RecordMetadata(TopicPartition topicPartition, long baseOffset, long relativeOffset, long timestamp, + long checksum, int serializedKeySize, int serializedValueSize) { + this(topicPartition, baseOffset, relativeOffset, timestamp, Long.valueOf(checksum), serializedKeySize, + serializedValueSize); } /** @@ -81,8 +80,20 @@ public final class RecordMetadata { /** * The checksum (CRC32) of the record. + * + * @deprecated As of Kafka 0.11.0. Because of the potential for message format conversion on the broker, the + * computed checksum may not match what was stored on the broker, or what will be returned to the consumer. + * It is therefore unsafe to depend on this checksum for end-to-end delivery guarantees. Additionally, + * message format v2 does not include a record-level checksum (for performance, the record checksum + * was replaced with a batch checksum). To maintain compatibility, a partial checksum computed from + * the record timestamp, serialized key size, and serialized value size is returned instead, but + * this should not be depended on for end-to-end reliability. */ + @Deprecated public long checksum() { + if (checksum == null) + // The checksum is null only for message format v2 and above, which do not have a record-level checksum. + this.checksum = DefaultRecord.computePartialChecksum(timestamp, serializedKeySize, serializedValueSize); return this.checksum; } http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java index 1de965f..8fcc46f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java @@ -31,13 +31,13 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> { private final ProduceRequestResult result; private final long relativeOffset; private final long createTimestamp; - private final long checksum; + private final Long checksum; private final int serializedKeySize; private final int serializedValueSize; private volatile FutureRecordMetadata nextRecordMetadata = null; public FutureRecordMetadata(ProduceRequestResult result, long relativeOffset, long createTimestamp, - long checksum, int serializedKeySize, int serializedValueSize) { + Long checksum, int serializedKeySize, int serializedValueSize) { this.result = result; this.relativeOffset = relativeOffset; this.createTimestamp = createTimestamp; @@ -96,14 +96,10 @@ public final class FutureRecordMetadata implements Future<RecordMetadata> { return value(); } - long checksum() { + Long checksumOrNull() { return this.checksum; } - long relativeOffset() { - return this.relativeOffset; - } - RecordMetadata value() { if (nextRecordMetadata != null) return nextRecordMetadata.value(); http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java index cdf85ce..df79707 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java @@ -103,7 +103,7 @@ public final class ProducerBatch { if (!recordsBuilder.hasRoomFor(timestamp, key, value)) { return null; } else { - long checksum = this.recordsBuilder.append(timestamp, key, value, headers); + Long checksum = this.recordsBuilder.append(timestamp, key, value, headers); this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers)); this.lastAppendTime = now; FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, @@ -131,7 +131,7 @@ public final class ProducerBatch { this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.sizeInBytesUpperBound(magic(), key, value, headers)); FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount, - timestamp, thunk.future.checksum(), + timestamp, thunk.future.checksumOrNull(), key == null ? -1 : key.remaining(), value == null ? -1 : value.remaining()); // Chain the future to the original thunk. http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java index e4ab4c6..61a8b7a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java @@ -111,10 +111,10 @@ public class ProducerInterceptors<K, V> implements Closeable { } else { if (interceptTopicPartition == null) { interceptTopicPartition = new TopicPartition(record.topic(), - record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition()); + record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : record.partition()); } - interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1, -1), - exception); + interceptor.onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1, + RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1), exception); } } catch (Exception e) { // do not propagate interceptor exceptions, just log http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java index f23d799..afd991f 100644 --- a/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java +++ b/clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java @@ -54,10 +54,8 @@ public class RecordHeaders implements Headers { this.headers = new ArrayList<>((Collection<Header>) headers); } else { this.headers = new ArrayList<>(); - Iterator<Header> iterator = headers.iterator(); - while (iterator.hasNext()) { - this.headers.add(iterator.next()); - } + for (Header header : headers) + this.headers.add(header); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 7be4bdd..6ce3ba3 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -110,6 +110,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl } @Override + public Long checksumOrNull() { + return checksum(); + } + + @Override public long checksum() { return outerRecord().checksum(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java index 37f92d2..9d0cd7e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java @@ -77,7 +77,6 @@ public class DefaultRecord implements Record { private final ByteBuffer key; private final ByteBuffer value; private final Header[] headers; - private Long checksum = null; private DefaultRecord(int sizeInBytes, byte attributes, @@ -122,10 +121,8 @@ public class DefaultRecord implements Record { } @Override - public long checksum() { - if (checksum == null) - checksum = computeChecksum(timestamp, key, value); - return checksum; + public Long checksumOrNull() { + return null; } @Override @@ -174,14 +171,14 @@ public class DefaultRecord implements Record { } /** - * Write the record to `out` and return its crc. + * Write the record to `out` and return its size. */ - public static long writeTo(DataOutputStream out, - int offsetDelta, - long timestampDelta, - ByteBuffer key, - ByteBuffer value, - Header[] headers) throws IOException { + public static int writeTo(DataOutputStream out, + int offsetDelta, + long timestampDelta, + ByteBuffer key, + ByteBuffer value, + Header[] headers) throws IOException { int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers); ByteUtils.writeVarint(sizeInBytes, out); @@ -230,18 +227,18 @@ public class DefaultRecord implements Record { } } - return computeChecksum(timestampDelta, key, value); + return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes; } /** - * Write the record to `out` and return its crc. + * Write the record to `out` and return its size. */ - public static long writeTo(ByteBuffer out, - int offsetDelta, - long timestampDelta, - ByteBuffer key, - ByteBuffer value, - Header[] headers) { + public static int writeTo(ByteBuffer out, + int offsetDelta, + long timestampDelta, + ByteBuffer key, + ByteBuffer value, + Header[] headers) { try { return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), offsetDelta, timestampDelta, key, value, headers); @@ -251,24 +248,6 @@ public class DefaultRecord implements Record { } } - /** - * Compute the checksum of the record from the timestamp, key and value payloads - */ - private static long computeChecksum(long timestamp, - ByteBuffer key, - ByteBuffer value) { - Checksum crc = Crc32C.create(); - Checksums.updateLong(crc, timestamp); - - if (key != null) - Checksums.update(crc, key, key.remaining()); - - if (value != null) - Checksums.update(crc, value, value.remaining()); - - return crc.getValue(); - } - @Override public boolean hasMagic(byte magic) { return magic >= MAGIC_VALUE_V2; @@ -493,14 +472,18 @@ public class DefaultRecord implements Record { return size; } - static int recordSizeUpperBound(byte[] key, byte[] value, Header[] headers) { - return recordSizeUpperBound(Utils.wrapNullable(key), Utils.wrapNullable(value), headers); - } - static int recordSizeUpperBound(ByteBuffer key, ByteBuffer value, Header[] headers) { int keySize = key == null ? -1 : key.remaining(); int valueSize = value == null ? -1 : value.remaining(); return MAX_RECORD_OVERHEAD + sizeOf(keySize, valueSize, headers); } + + public static long computePartialChecksum(long timestamp, int serializedKeySize, int serializedValueSize) { + Checksum checksum = Crc32C.create(); + Checksums.updateLong(checksum, timestamp); + Checksums.updateInt(checksum, serializedKeySize); + Checksums.updateInt(checksum, serializedValueSize); + return checksum.getValue(); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 589e67c..13f958d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.ByteBufferInputStream; import org.apache.kafka.common.utils.ByteUtils; import org.apache.kafka.common.utils.CloseableIterator; import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Utils; import java.io.DataInputStream; import java.io.IOException; @@ -30,7 +31,6 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; -import org.apache.kafka.common.utils.Utils; import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; @@ -493,4 +493,5 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe } } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 42ae0f8..bc25d75 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -344,7 +344,10 @@ public class MemoryRecordsBuilder { return writtenCompressed; } - private long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, + /** + * Append a record and return its checksum for message format v0 and v1, or null for for v2 and above. + */ + private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { try { if (isControlRecord != isControlBatch) @@ -363,10 +366,12 @@ public class MemoryRecordsBuilder { if (baseTimestamp == null) baseTimestamp = timestamp; - if (magic > RecordBatch.MAGIC_VALUE_V1) - return appendDefaultRecord(offset, timestamp, key, value, headers); - else + if (magic > RecordBatch.MAGIC_VALUE_V1) { + appendDefaultRecord(offset, timestamp, key, value, headers); + return null; + } else { return appendLegacyRecord(offset, timestamp, key, value); + } } catch (IOException e) { throw new KafkaException("I/O exception when writing to the append stream, closing", e); } @@ -379,9 +384,9 @@ public class MemoryRecordsBuilder { * @param key The record key * @param value The record value * @param headers The record headers if there are any - * @return crc of the record + * @return CRC of the record or null if record-level CRC is not supported for the message format */ - public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value, Header[] headers) { + public Long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value, Header[] headers) { return appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value), headers); } @@ -392,9 +397,9 @@ public class MemoryRecordsBuilder { * @param key The record key * @param value The record value * @param headers The record headers if there are any - * @return crc of the record + * @return CRC of the record or null if record-level CRC is not supported for the message format */ - public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { + public Long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { return appendWithOffset(offset, false, timestamp, key, value, headers); } @@ -404,9 +409,9 @@ public class MemoryRecordsBuilder { * @param timestamp The record timestamp * @param key The record key * @param value The record value - * @return crc of the record + * @return CRC of the record or null if record-level CRC is not supported for the message format */ - public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) { + public Long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) { return appendWithOffset(offset, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS); } @@ -416,9 +421,9 @@ public class MemoryRecordsBuilder { * @param timestamp The record timestamp * @param key The record key * @param value The record value - * @return crc of the record + * @return CRC of the record or null if record-level CRC is not supported for the message format */ - public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) { + public Long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) { return appendWithOffset(offset, timestamp, key, value, Record.EMPTY_HEADERS); } @@ -426,21 +431,20 @@ public class MemoryRecordsBuilder { * Append a new record at the given offset. * @param offset The absolute offset of the record in the log buffer * @param record The record to append - * @return crc of the record + * @return CRC of the record or null if record-level CRC is not supported for the message format */ - public long appendWithOffset(long offset, SimpleRecord record) { + public Long appendWithOffset(long offset, SimpleRecord record) { return appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers()); } - /** * Append a new record at the next sequential offset. * @param timestamp The record timestamp * @param key The record key * @param value The record value - * @return crc of the record + * @return CRC of the record or null if record-level CRC is not supported for the message format */ - public long append(long timestamp, ByteBuffer key, ByteBuffer value) { + public Long append(long timestamp, ByteBuffer key, ByteBuffer value) { return append(timestamp, key, value, Record.EMPTY_HEADERS); } @@ -450,9 +454,9 @@ public class MemoryRecordsBuilder { * @param key The record key * @param value The record value * @param headers The record headers if there are any - * @return crc of the record + * @return CRC of the record or null if record-level CRC is not supported for the message format */ - public long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { + public Long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers); } @@ -461,9 +465,9 @@ public class MemoryRecordsBuilder { * @param timestamp The record timestamp * @param key The record key * @param value The record value - * @return crc of the record + * @return CRC of the record or null if record-level CRC is not supported for the message format */ - public long append(long timestamp, byte[] key, byte[] value) { + public Long append(long timestamp, byte[] key, byte[] value) { return append(timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS); } @@ -473,18 +477,18 @@ public class MemoryRecordsBuilder { * @param key The record key * @param value The record value * @param headers The record headers if there are any - * @return crc of the record + * @return CRC of the record or null if record-level CRC is not supported for the message format */ - public long append(long timestamp, byte[] key, byte[] value, Header[] headers) { + public Long append(long timestamp, byte[] key, byte[] value, Header[] headers) { return append(timestamp, wrapNullable(key), wrapNullable(value), headers); } /** * Append a new record at the next sequential offset. * @param record The record to append - * @return crc of the record + * @return CRC of the record or null if record-level CRC is not supported for the message format */ - public long append(SimpleRecord record) { + public Long append(SimpleRecord record) { return appendWithOffset(nextSequentialOffset(), record); } @@ -493,9 +497,9 @@ public class MemoryRecordsBuilder { * @param timestamp The record timestamp * @param type The control record type (cannot be UNKNOWN) * @param value The control record value - * @return crc of the record + * @return CRC of the record or null if record-level CRC is not supported for the message format */ - private long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) { + private Long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) { Struct keyStruct = type.recordKey(); ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf()); keyStruct.writeTo(key); @@ -503,7 +507,10 @@ public class MemoryRecordsBuilder { return appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, Record.EMPTY_HEADERS); } - public long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) { + /** + * Return CRC of the record or null if record-level CRC is not supported for the message format + */ + public Long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) { if (producerId == RecordBatch.NO_PRODUCER_ID) throw new IllegalArgumentException("End transaction marker requires a valid producerId"); if (!isTransactional) @@ -568,15 +575,13 @@ public class MemoryRecordsBuilder { appendWithOffset(nextSequentialOffset(), record); } - private long appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, + private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException { ensureOpenForRecordAppend(); int offsetDelta = (int) (offset - baseOffset); long timestampDelta = timestamp - baseTimestamp; - long crc = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); - // TODO: The crc is useless for the new message format. Maybe we should let writeTo return the written size? - recordWritten(offset, timestamp, DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, key, value, headers)); - return crc; + int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); + recordWritten(offset, timestamp, sizeInBytes); } private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException { http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/main/java/org/apache/kafka/common/record/Record.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index cba6fc5..6de28c3 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -54,9 +54,10 @@ public interface Record { /** * Get a checksum of the record contents. - * @return a 4-byte unsigned checksum represented as a long + * @return A 4-byte unsigned checksum represented as a long or null if the message format does not + * include a checksum (i.e. for v2 and above) */ - long checksum(); + Long checksumOrNull(); /** * Check whether the record has a valid checksum. http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java index a8a5283..3273645 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.DefaultRecord; import org.apache.kafka.common.record.TimestampType; import org.junit.Test; @@ -25,6 +26,7 @@ import static org.junit.Assert.assertEquals; public class ConsumerRecordTest { @Test + @SuppressWarnings("deprecation") public void testOldConstructor() { String topic = "topic"; int partition = 0; @@ -46,5 +48,15 @@ public class ConsumerRecordTest { assertEquals(new RecordHeaders(), record.headers()); } + @Test + @SuppressWarnings("deprecation") + public void testNullChecksumInConstructor() { + String key = "key"; + String value = "value"; + long timestamp = 242341324L; + ConsumerRecord<String, String> record = new ConsumerRecord<>("topic", 0, 23L, timestamp, + TimestampType.CREATE_TIME, null, key.length(), value.length(), key, value, new RecordHeaders()); + assertEquals(DefaultRecord.computePartialChecksum(timestamp, key.length(), value.length()), record.checksum()); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java new file mode 100644 index 0000000..a735a61 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordMetadataTest.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.producer; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.record.DefaultRecord; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class RecordMetadataTest { + + @Test + @SuppressWarnings("deprecation") + public void testConstructionWithMissingRelativeOffset() { + TopicPartition tp = new TopicPartition("foo", 0); + long timestamp = 2340234L; + int keySize = 3; + int valueSize = 5; + Long checksum = 908923L; + + RecordMetadata metadata = new RecordMetadata(tp, -1L, -1L, timestamp, checksum, keySize, valueSize); + assertEquals(tp.topic(), metadata.topic()); + assertEquals(tp.partition(), metadata.partition()); + assertEquals(timestamp, metadata.timestamp()); + assertEquals(-1L, metadata.offset()); + assertEquals(checksum.longValue(), metadata.checksum()); + assertEquals(keySize, metadata.serializedKeySize()); + assertEquals(valueSize, metadata.serializedValueSize()); + } + + @Test + @SuppressWarnings("deprecation") + public void testConstructionWithRelativeOffset() { + TopicPartition tp = new TopicPartition("foo", 0); + long timestamp = 2340234L; + int keySize = 3; + int valueSize = 5; + long baseOffset = 15L; + long relativeOffset = 3L; + Long checksum = 908923L; + + RecordMetadata metadata = new RecordMetadata(tp, baseOffset, relativeOffset, timestamp, checksum, + keySize, valueSize); + assertEquals(tp.topic(), metadata.topic()); + assertEquals(tp.partition(), metadata.partition()); + assertEquals(timestamp, metadata.timestamp()); + assertEquals(baseOffset + relativeOffset, metadata.offset()); + assertEquals(checksum.longValue(), metadata.checksum()); + assertEquals(keySize, metadata.serializedKeySize()); + assertEquals(valueSize, metadata.serializedValueSize()); + } + + @Test + @SuppressWarnings("deprecation") + public void testNullChecksum() { + long timestamp = 2340234L; + int keySize = 3; + int valueSize = 5; + RecordMetadata metadata = new RecordMetadata(new TopicPartition("foo", 0), 15L, 3L, timestamp, null, + keySize, valueSize); + assertEquals(DefaultRecord.computePartialChecksum(timestamp, keySize, valueSize), metadata.checksum()); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java index 90ff16a..c083db3 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordSendTest.java @@ -46,7 +46,7 @@ public class RecordSendTest { public void testTimeout() throws Exception { ProduceRequestResult request = new ProduceRequestResult(topicPartition); FutureRecordMetadata future = new FutureRecordMetadata(request, relOffset, - RecordBatch.NO_TIMESTAMP, 0, 0, 0); + RecordBatch.NO_TIMESTAMP, 0L, 0, 0); assertFalse("Request is not completed", future.isDone()); try { future.get(5, TimeUnit.MILLISECONDS); @@ -66,7 +66,7 @@ public class RecordSendTest { @Test(expected = ExecutionException.class) public void testError() throws Exception { FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, new CorruptRecordException(), 50L), - relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, 0); + relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0); future.get(); } @@ -76,7 +76,7 @@ public class RecordSendTest { @Test public void testBlocking() throws Exception { FutureRecordMetadata future = new FutureRecordMetadata(asyncRequest(baseOffset, null, 50L), - relOffset, RecordBatch.NO_TIMESTAMP, 0, 0, 0); + relOffset, RecordBatch.NO_TIMESTAMP, 0L, 0, 0); assertEquals(baseOffset + relOffset, future.get().offset()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java index fede528..da93015 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java @@ -18,17 +18,21 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.LegacyRecord; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.TimestampType; import org.junit.Test; import java.nio.ByteBuffer; +import java.util.Arrays; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; public class ProducerBatchTest { @@ -38,6 +42,31 @@ public class ProducerBatchTest { private final MemoryRecordsBuilder memoryRecordsBuilder = MemoryRecords.builder(ByteBuffer.allocate(128), CompressionType.NONE, TimestampType.CREATE_TIME, 128); + @Test + public void testChecksumNullForMagicV2() { + ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); + FutureRecordMetadata future = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now); + assertNotNull(future); + assertNull(future.checksumOrNull()); + } + + @Test + public void testAppendedChecksumMagicV0AndV1() { + for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1)) { + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(128), magic, + CompressionType.NONE, TimestampType.CREATE_TIME, 128); + ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), builder, now); + byte[] key = "hi".getBytes(); + byte[] value = "there".getBytes(); + + FutureRecordMetadata future = batch.tryAppend(now, key, value, Record.EMPTY_HEADERS, null, now); + assertNotNull(future); + byte attributes = LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME); + long expectedChecksum = LegacyRecord.computeChecksum(magic, attributes, now, key, value); + assertEquals(expectedChecksum, future.checksumOrNull().longValue()); + } + } + /** * A {@link ProducerBatch} configured using a very large linger value and a timestamp preceding its create * time is interpreted correctly as not expired when the linger time is larger than the difference http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java index 32b3ddb..9eeb114 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java @@ -144,7 +144,7 @@ public class ProducerInterceptorsTest { ProducerInterceptors<Integer, String> interceptors = new ProducerInterceptors<>(interceptorList); // verify onAck is called on all interceptors - RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0, 0); + RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, Long.valueOf(0L), 0, 0); interceptors.onAcknowledgement(meta, null); assertEquals(2, onAckCount); http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java index b9675c3..f48ab33 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java @@ -16,9 +16,6 @@ */ package org.apache.kafka.clients.producer.internals; -import java.util.Random; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import org.apache.kafka.clients.ApiVersions; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.producer.Callback; @@ -33,8 +30,8 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.record.CompressionRatioEstimator; import org.apache.kafka.common.record.CompressionType; -import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.DefaultRecord; +import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.MutableRecordBatch; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.requests.ApiVersionsResponse; @@ -52,7 +49,10 @@ import java.util.Deque; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 58d4371..0922c48 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -323,6 +323,22 @@ public class MemoryRecordsBuilderTest { } @Test + public void testAppendedChecksumConsistency() { + ByteBuffer buffer = ByteBuffer.allocate(512); + for (byte magic : Arrays.asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2)) { + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, + TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID, + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity()); + Long checksumOrNull = builder.append(1L, "key".getBytes(), "value".getBytes()); + MemoryRecords memoryRecords = builder.build(); + List<Record> records = TestUtils.toList(memoryRecords.records()); + assertEquals(1, records.size()); + assertEquals(checksumOrNull, records.get(0).checksumOrNull()); + } + } + + @Test public void testSmallWriteLimit() { // with a small write limit, we always allow at least one record to be added http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java index 08c8e74..3fcc157 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java +++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java @@ -56,6 +56,7 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume } @Override + @SuppressWarnings("deprecation") public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { // This will ensure that we get the cluster metadata when onConsume is called for the first time @@ -99,4 +100,4 @@ public class MockConsumerInterceptor implements ClusterResourceListener, Consume public void onUpdate(ClusterResource clusterResource) { CLUSTER_META.set(clusterResource); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/core/src/main/scala/kafka/tools/DumpLogSegments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index 4d35a85..f0c41c7 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -353,11 +353,13 @@ object DumpLogSegments { print("offset: " + record.offset + " position: " + validBytes + " " + batch.timestampType + ": " + record.timestamp + " isvalid: " + record.isValid + " keysize: " + record.keySize + " valuesize: " + record.valueSize + " magic: " + batch.magic + - " compresscodec: " + batch.compressionType + " crc: " + record.checksum) + " compresscodec: " + batch.compressionType) if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { - print(" sequence: " + record.sequence + + print(" crc: " + batch.checksum + " sequence: " + record.sequence + " headerKeys: " + record.headers.map(_.key).mkString("[", ",", "]")) + } else { + print(" crc: " + record.checksumOrNull) } if (batch.isControlBatch) { http://git-wip-us.apache.org/repos/asf/kafka/blob/cea319a4/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index a2d83a6..dab5fa7 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -68,7 +68,7 @@ individual messages is only reduced by the overhead of the batch format. This similarly affects the producer's <code>batch.size</code> configuration.</li> <li>GC log rotation is enabled by default, see KAFKA-3754 for details.</li> - <li>Deprecated constructors of MetricName and Cluster classes have been removed.</li> + <li>Deprecated constructors of RecordMetadata, MetricName and Cluster classes have been removed.</li> <li>Added user headers support through a new Headers interface providing user headers read and write access.</li> <li>ProducerRecord and ConsumerRecord expose the new Headers API via <code>Headers headers()</code> method call.</li> <li>ExtendedSerializer and ExtendedDeserializer interfaces are introduced to support serialization and deserialization for headers. Headers will be ignored if the configured serializer and deserializer are not the above classes.</li>
