KAFKA-5121; Implement transaction index for KIP-98 Author: Jason Gustafson <[email protected]>
Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #2910 from hachikuji/eos-txn-index Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e71dce89 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e71dce89 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e71dce89 Branch: refs/heads/trunk Commit: e71dce89c0da50f3eccc47d0fc050c92d5a99b88 Parents: 29994dd Author: Jason Gustafson <[email protected]> Authored: Sat May 6 11:49:35 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Sat May 6 11:49:35 2017 -0700 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 79 +-- .../TransactionCoordinatorFencedException.java | 30 + .../apache/kafka/common/protocol/Errors.java | 17 +- .../apache/kafka/common/protocol/Protocol.java | 17 +- .../record/AbstractLegacyRecordBatch.java | 2 +- .../kafka/common/record/AbstractRecords.java | 9 +- .../kafka/common/record/ControlRecordType.java | 24 +- .../kafka/common/record/DefaultRecord.java | 50 +- .../kafka/common/record/DefaultRecordBatch.java | 22 +- .../common/record/EndTransactionMarker.java | 124 ++++ .../kafka/common/record/FileLogInputStream.java | 6 + .../apache/kafka/common/record/FileRecords.java | 13 +- .../kafka/common/record/MemoryRecords.java | 109 +++- .../common/record/MemoryRecordsBuilder.java | 66 +- .../org/apache/kafka/common/record/Record.java | 9 - .../apache/kafka/common/record/RecordBatch.java | 8 + .../common/requests/ListOffsetRequest.java | 34 +- .../org/apache/kafka/common/utils/Utils.java | 5 + .../clients/consumer/internals/FetcherTest.java | 97 +-- .../common/record/DefaultRecordBatchTest.java | 31 +- .../kafka/common/record/DefaultRecordTest.java | 45 +- .../common/record/EndTransactionMarkerTest.java | 70 +++ .../kafka/common/record/FileRecordsTest.java | 11 +- .../common/record/MemoryRecordsBuilderTest.java | 93 ++- .../kafka/common/record/MemoryRecordsTest.java | 80 ++- .../common/requests/RequestResponseTest.java | 22 +- .../main/scala/kafka/cluster/Partition.scala | 4 +- core/src/main/scala/kafka/cluster/Replica.scala | 23 +- .../group/GroupMetadataManager.scala | 27 +- .../transaction/TransactionStateManager.scala | 19 +- .../main/scala/kafka/log/AbstractIndex.scala | 23 +- core/src/main/scala/kafka/log/Log.scala | 611 ++++++++++++------- core/src/main/scala/kafka/log/LogCleaner.scala | 34 +- core/src/main/scala/kafka/log/LogManager.scala | 6 +- core/src/main/scala/kafka/log/LogSegment.scala | 132 ++-- .../src/main/scala/kafka/log/LogValidator.scala | 83 +-- core/src/main/scala/kafka/log/OffsetIndex.scala | 20 +- .../scala/kafka/log/ProducerIdMapping.scala | 384 ------------ .../scala/kafka/log/ProducerStateManager.scala | 590 ++++++++++++++++++ core/src/main/scala/kafka/log/TimeIndex.scala | 7 +- .../main/scala/kafka/log/TransactionIndex.scala | 243 ++++++++ .../main/scala/kafka/server/DelayedFetch.scala | 14 +- .../main/scala/kafka/server/FetchDataInfo.scala | 4 +- .../src/main/scala/kafka/server/KafkaApis.scala | 67 +- .../scala/kafka/server/LogOffsetMetadata.scala | 8 +- .../scala/kafka/server/ReplicaManager.scala | 31 +- .../scala/kafka/tools/DumpLogSegments.scala | 29 +- .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../group/GroupCoordinatorResponseTest.scala | 6 +- .../group/GroupMetadataManagerTest.scala | 22 +- .../TransactionStateManagerTest.scala | 7 +- .../unit/kafka/log/BrokerCompressionTest.scala | 2 +- .../kafka/log/LogCleanerIntegrationTest.scala | 2 +- .../log/LogCleanerLagIntegrationTest.scala | 3 +- .../scala/unit/kafka/log/LogCleanerTest.scala | 2 +- .../scala/unit/kafka/log/LogManagerTest.scala | 5 +- .../scala/unit/kafka/log/LogSegmentTest.scala | 104 +++- .../src/test/scala/unit/kafka/log/LogTest.scala | 555 +++++++++++++++-- .../scala/unit/kafka/log/LogValidatorTest.scala | 209 +++++-- .../scala/unit/kafka/log/OffsetIndexTest.scala | 24 +- .../unit/kafka/log/ProducerIdMappingTest.scala | 291 --------- .../kafka/log/ProducerStateManagerTest.scala | 562 +++++++++++++++++ .../unit/kafka/log/TransactionIndexTest.scala | 173 ++++++ .../scala/unit/kafka/server/LogOffsetTest.scala | 12 +- .../kafka/server/ReplicaManagerQuotasTest.scala | 7 +- .../unit/kafka/server/ReplicaManagerTest.scala | 151 ++++- .../unit/kafka/server/RequestQuotaTest.scala | 7 +- .../unit/kafka/server/SimpleFetchTest.scala | 7 +- .../test/scala/unit/kafka/utils/TestUtils.scala | 2 +- 69 files changed, 4099 insertions(+), 1488 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 0c5c385..dc6c338 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RecordTooLargeException; @@ -669,7 +668,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequest(final Node node, final Map<TopicPartition, Long> timestampsToSearch, boolean requireTimestamp) { - ListOffsetRequest.Builder builder = ListOffsetRequest.Builder.forConsumer(requireTimestamp) + ListOffsetRequest.Builder builder = ListOffsetRequest.Builder + .forConsumer(requireTimestamp, isolationLevel) .setTargetTimes(timestampsToSearch); log.trace("Sending ListOffsetRequest {} to broker {}", builder, node); @@ -1003,12 +1003,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { return null; } currentBatch = batches.next(); - maybeEnsureValid(currentBatch); - if (isolationLevel == IsolationLevel.READ_COMMITTED && isBatchAborted(currentBatch)) { - nextFetchOffset = currentBatch.lastOffset() + 1; - continue; + if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) { + long producerId = currentBatch.producerId(); + if (containsAbortMarker(currentBatch)) { + abortedProducerIds.remove(producerId); + } else if (isBatchAborted(currentBatch)) { + log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition: {}", + producerId, currentBatch.baseOffset(), partition); + nextFetchOffset = currentBatch.lastOffset() + 1; + continue; + } } records = currentBatch.streamingIterator(); @@ -1022,7 +1028,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { nextFetchOffset = record.offset() + 1; // control records are not returned to the user - if (!record.isControlRecord()) + if (!currentBatch.isControlBatch()) return record; } } @@ -1046,7 +1052,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { } private boolean isBatchAborted(RecordBatch batch) { - /* When in READ_COMMITTED mode, we need to do the following for each incoming entry: + /* When in READ_COMMITTED mode, we need to do the following for each incoming entry: * 0. Check whether the pid is in the 'abortedProducerIds' set && the entry does not include an abort marker. * If so, skip the entry. * 1. If the pid is in aborted pids and the entry contains an abort marker, remove the pid from @@ -1056,47 +1062,48 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { * this means that the entry has been aborted. Add the pid to the aborted pids set, and remove * the entry from the abort index. */ - FetchResponse.AbortedTransaction nextAbortedTransaction = abortedTransactions.peek(); - if (abortedProducerIds.contains(batch.producerId()) - || (nextAbortedTransaction != null && nextAbortedTransaction.producerId == batch.producerId() && nextAbortedTransaction.firstOffset <= batch.baseOffset())) { - if (abortedProducerIds.contains(batch.producerId()) && containsAbortMarker(batch)) { - abortedProducerIds.remove(batch.producerId()); - } else if (nextAbortedTransaction != null && nextAbortedTransaction.producerId == batch.producerId() && nextAbortedTransaction.firstOffset <= batch.baseOffset()) { - abortedProducerIds.add(batch.producerId()); + long producerId = batch.producerId(); + if (abortedProducerIds.contains(producerId)) { + return true; + } else if (abortedTransactions != null && !abortedTransactions.isEmpty()) { + FetchResponse.AbortedTransaction nextAbortedTransaction = abortedTransactions.peek(); + if (nextAbortedTransaction.producerId == producerId && nextAbortedTransaction.firstOffset <= batch.baseOffset()) { + abortedProducerIds.add(producerId); abortedTransactions.poll(); + return true; } - log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition: {}", batch.producerId(), batch.baseOffset(), partition); - return true; } return false; } private PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions(FetchResponse.PartitionData partition) { - PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions = null; - if (partition.abortedTransactions != null && !partition.abortedTransactions.isEmpty()) { - abortedTransactions = new PriorityQueue<>( - partition.abortedTransactions.size(), - new Comparator<FetchResponse.AbortedTransaction>() { - @Override - public int compare(FetchResponse.AbortedTransaction o1, FetchResponse.AbortedTransaction o2) { - return Long.compare(o1.firstOffset, o2.firstOffset); - } + if (partition.abortedTransactions == null || partition.abortedTransactions.isEmpty()) + return null; + + PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions = new PriorityQueue<>( + partition.abortedTransactions.size(), + new Comparator<FetchResponse.AbortedTransaction>() { + @Override + public int compare(FetchResponse.AbortedTransaction o1, FetchResponse.AbortedTransaction o2) { + return Long.compare(o1.firstOffset, o2.firstOffset); } - ); - abortedTransactions.addAll(partition.abortedTransactions); - } else { - abortedTransactions = new PriorityQueue<>(); - } + } + ); + abortedTransactions.addAll(partition.abortedTransactions); return abortedTransactions; } private boolean containsAbortMarker(RecordBatch batch) { + if (!batch.isControlBatch()) + return false; + Iterator<Record> batchIterator = batch.iterator(); - Record firstRecord = batchIterator.hasNext() ? batchIterator.next() : null; - boolean containsAbortMarker = firstRecord != null && firstRecord.isControlRecord() && ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key()); - if (containsAbortMarker && batchIterator.hasNext()) - throw new CorruptRecordException("A record batch containing a control message contained more than one record. partition: " + partition + ", offset: " + batch.baseOffset()); - return containsAbortMarker; + if (!batchIterator.hasNext()) + throw new InvalidRecordException("Invalid batch for partition " + partition + " at offset " + + batch.baseOffset() + " with control sequence set, but no records"); + + Record firstRecord = batchIterator.next(); + return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorFencedException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorFencedException.java b/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorFencedException.java new file mode 100644 index 0000000..583ce04 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/TransactionCoordinatorFencedException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class TransactionCoordinatorFencedException extends ApiException { + + private static final long serialVersionUID = 1L; + + public TransactionCoordinatorFencedException(String message) { + super(message); + } + + public TransactionCoordinatorFencedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 65bec4a..960fdda 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -21,11 +21,11 @@ import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.ConcurrentTransactionsException; import org.apache.kafka.common.errors.ControllerMovedException; +import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; +import org.apache.kafka.common.errors.CoordinatorNotAvailableException; import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.errors.DuplicateSequenceNumberException; import org.apache.kafka.common.errors.GroupAuthorizationException; -import org.apache.kafka.common.errors.CoordinatorNotAvailableException; -import org.apache.kafka.common.errors.CoordinatorLoadInProgressException; import org.apache.kafka.common.errors.IllegalGenerationException; import org.apache.kafka.common.errors.IllegalSaslStateException; import org.apache.kafka.common.errors.InconsistentGroupProtocolException; @@ -39,12 +39,11 @@ import org.apache.kafka.common.errors.InvalidReplicaAssignmentException; import org.apache.kafka.common.errors.InvalidReplicationFactorException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidRequiredAcksException; -import org.apache.kafka.common.errors.InvalidTxnTimeoutException; -import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.InvalidSessionTimeoutException; import org.apache.kafka.common.errors.InvalidTimestampException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.InvalidTxnTimeoutException; import org.apache.kafka.common.errors.LeaderNotAvailableException; import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.NotControllerException; @@ -54,6 +53,7 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.NotLeaderForPartitionException; import org.apache.kafka.common.errors.OffsetMetadataTooLarge; import org.apache.kafka.common.errors.OffsetOutOfRangeException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.PolicyViolationException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RebalanceInProgressException; @@ -64,6 +64,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TopicExistsException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @@ -460,6 +461,14 @@ public enum Errors { public ApiException build(String message) { return new ConcurrentTransactionsException(message); } + }), + TRANSACTION_COORDINATOR_FENCED(52, "Indicates that the transaction coordinator sending a WriteTxnMarker " + + "is no longer the current coordinator for a given producer", + new ApiExceptionBuilder() { + @Override + public ApiException build(String message) { + return new TransactionCoordinatorFencedException(message); + } }); private interface ApiExceptionBuilder { http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 16ec9ea..fb3c8c9 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -467,8 +467,21 @@ public class Protocol { new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1), "Topics to list offsets.")); - /* v2 request is the same as v1. Throttle time has been added to response */ - public static final Schema LIST_OFFSET_REQUEST_V2 = LIST_OFFSET_REQUEST_V1; + public static final Schema LIST_OFFSET_REQUEST_V2 = new Schema( + new Field("replica_id", + INT32, + "Broker id of the follower. For normal consumers, use -1."), + new Field("isolation_level", + INT8, + "This setting controls the visibility of transactional records. Using READ_UNCOMMITTED " + + "(isolation_level = 0) makes all records visible. With READ_COMMITTED (isolation_level = 1), " + + "non-transactional and COMMITTED transactional records are visible. To be more concrete, " + + "READ_COMMITTED returns all data from offsets smaller than the current LSO (last stable offset), " + + "and enables the inclusion of the list of aborted transactions in the result, which allows " + + "consumers to discard ABORTED transactional records"), + new Field("topics", + new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1), + "Topics to list offsets."));; public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java index 85fcb2a..7be4bdd 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java @@ -206,7 +206,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl } @Override - public boolean isControlRecord() { + public boolean isControlBatch() { return false; } http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 87df7e4..cfda8a4 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -56,6 +56,9 @@ public abstract class AbstractRecords implements Records { int totalSizeEstimate = 0; for (RecordBatch batch : batches) { + if (toMagic < RecordBatch.MAGIC_VALUE_V2 && batch.isControlBatch()) + continue; + if (batch.magic() <= toMagic) { totalSizeEstimate += batch.sizeInBytes(); recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, null, null)); @@ -94,12 +97,8 @@ public abstract class AbstractRecords implements Records { MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, batch.compressionType(), timestampType, recordBatchAndRecords.baseOffset, logAppendTime); - for (Record record : recordBatchAndRecords.records) { - // control messages are only supported in v2 and above, so skip when down-converting - if (magic < RecordBatch.MAGIC_VALUE_V2 && record.isControlRecord()) - continue; + for (Record record : recordBatchAndRecords.records) builder.append(record); - } builder.close(); return builder.buffer(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java index 790b2ee..d5ead14 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java +++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordType.java @@ -49,6 +49,7 @@ public enum ControlRecordType { private static final Logger log = LoggerFactory.getLogger(ControlRecordType.class); static final short CURRENT_CONTROL_RECORD_KEY_VERSION = 0; + static final int CURRENT_CONTROL_RECORD_KEY_SIZE = 4; private static final Schema CONTROL_RECORD_KEY_SCHEMA_VERSION_V0 = new Schema( new Field("version", Type.INT16), new Field("type", Type.INT16)); @@ -69,13 +70,24 @@ public enum ControlRecordType { return struct; } - public static ControlRecordType parse(ByteBuffer key) { + public static short parseTypeId(ByteBuffer key) { + if (key.remaining() < CURRENT_CONTROL_RECORD_KEY_SIZE) + throw new InvalidRecordException("Invalid value size found for end control record key. Must have " + + "at least " + CURRENT_CONTROL_RECORD_KEY_SIZE + " bytes, but found only " + key.remaining()); + short version = key.getShort(0); + if (version < 0) + throw new InvalidRecordException("Invalid version found for control record: " + version + + ". May indicate data corruption"); + if (version != CURRENT_CONTROL_RECORD_KEY_VERSION) - log.debug("Received unknown control record key version {}. Parsing as version {}", version, + log.debug("Received unknown control record key version {}. Parsing as version {}", version, CURRENT_CONTROL_RECORD_KEY_VERSION); - short type = key.getShort(2); - switch (type) { + return key.getShort(2); + } + + public static ControlRecordType fromTypeId(short typeId) { + switch (typeId) { case 0: return ABORT; case 1: @@ -84,4 +96,8 @@ public enum ControlRecordType { return UNKNOWN; } } + + public static ControlRecordType parse(ByteBuffer key) { + return fromTypeId(parseTypeId(key)); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java index e0794d8..669c75d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java @@ -55,11 +55,9 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable; * * The current record attributes are depicted below: * - * ----------------------------------- - * | Unused (1-7) | Control Flag (0) | - * ----------------------------------- - * - * The control flag is used to implement control records (see {@link ControlRecordType}). + * ---------------- + * | Unused (0-7) | + * ---------------- * * The offset and timestamp deltas compute the difference relative to the base offset and * base timestamp of the log entry that this record is contained in. @@ -69,7 +67,6 @@ public class DefaultRecord implements Record { // excluding key, value and headers: 5 bytes length + 10 bytes timestamp + 5 bytes offset + 1 byte attributes public static final int MAX_RECORD_OVERHEAD = 21; - private static final int CONTROL_FLAG_MASK = 0x01; private static final int NULL_VARINT_SIZE_BYTES = ByteUtils.sizeOfVarint(-1); private final int sizeInBytes; @@ -180,7 +177,6 @@ public class DefaultRecord implements Record { * Write the record to `out` and return its crc. */ public static long writeTo(DataOutputStream out, - boolean isControlRecord, int offsetDelta, long timestampDelta, ByteBuffer key, @@ -189,7 +185,7 @@ public class DefaultRecord implements Record { int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers); ByteUtils.writeVarint(sizeInBytes, out); - byte attributes = computeAttributes(isControlRecord); + byte attributes = 0; // there are no used record attributes at the moment out.write(attributes); ByteUtils.writeVarlong(timestampDelta, out); @@ -241,15 +237,14 @@ public class DefaultRecord implements Record { * Write the record to `out` and return its crc. */ public static long writeTo(ByteBuffer out, - boolean isControlRecord, int offsetDelta, long timestampDelta, ByteBuffer key, ByteBuffer value, Header[] headers) { try { - return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), isControlRecord, offsetDelta, - timestampDelta, key, value, headers); + return writeTo(new DataOutputStream(new ByteBufferOutputStream(out)), offsetDelta, timestampDelta, + key, value, headers); } catch (IOException e) { // cannot actually be raised by ByteBufferOutputStream throw new IllegalStateException("Unexpected exception raised from ByteBufferOutputStream", e); @@ -290,11 +285,6 @@ public class DefaultRecord implements Record { } @Override - public boolean isControlRecord() { - return (attributes & CONTROL_FLAG_MASK) != 0; - } - - @Override public String toString() { return String.format("DefaultRecord(offset=%d, timestamp=%d, key=%d bytes, value=%d bytes)", offset, @@ -421,10 +411,6 @@ public class DefaultRecord implements Record { return new DefaultRecord(sizeInBytes, attributes, offset, timestamp, sequence, key, value, headers); } - private static byte computeAttributes(boolean isControlRecord) { - return isControlRecord ? CONTROL_FLAG_MASK : (byte) 0; - } - public static int sizeInBytes(int offsetDelta, long timestampDelta, byte[] key, @@ -441,19 +427,35 @@ public class DefaultRecord implements Record { return bodySize + ByteUtils.sizeOfVarint(bodySize); } + public static int sizeInBytes(int offsetDelta, + long timestampDelta, + int keySize, + int valueSize, + Header[] headers) { + int bodySize = sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers); + return bodySize + ByteUtils.sizeOfVarint(bodySize); + } + private static int sizeOfBodyInBytes(int offsetDelta, long timestampDelta, ByteBuffer key, ByteBuffer value, Header[] headers) { - int size = 1; // always one byte for attributes - size += ByteUtils.sizeOfVarint(offsetDelta); - size += ByteUtils.sizeOfVarlong(timestampDelta); int keySize = key == null ? -1 : key.remaining(); int valueSize = value == null ? -1 : value.remaining(); - size += sizeOf(keySize, valueSize, headers); + return sizeOfBodyInBytes(offsetDelta, timestampDelta, keySize, valueSize, headers); + } + private static int sizeOfBodyInBytes(int offsetDelta, + long timestampDelta, + int keySize, + int valueSize, + Header[] headers) { + int size = 1; // always one byte for attributes + size += ByteUtils.sizeOfVarint(offsetDelta); + size += ByteUtils.sizeOfVarlong(timestampDelta); + size += sizeOf(keySize, valueSize, headers); return size; } http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index 93cd2eb..f321c3b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -62,9 +62,9 @@ import static org.apache.kafka.common.record.Records.LOG_OVERHEAD; * * The current attributes are given below: * - * ----------------------------------------------------------------------------------- - * | Unused (5-15) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) | - * ----------------------------------------------------------------------------------- + * ------------------------------------------------------------------------------------------------- + * | Unused (6-15) | Control (5) | Transactional (4) | Timestamp Type (3) | Compression Type (0-2) | + * ------------------------------------------------------------------------------------------------- */ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRecordBatch { static final int BASE_OFFSET_OFFSET = 0; @@ -98,6 +98,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe private static final byte COMPRESSION_CODEC_MASK = 0x07; private static final byte TRANSACTIONAL_FLAG_MASK = 0x10; + private static final int CONTROL_FLAG_MASK = 0x20; private static final byte TIMESTAMP_TYPE_MASK = 0x08; private final ByteBuffer buffer; @@ -203,6 +204,11 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe } @Override + public boolean isControlBatch() { + return (attributes() & CONTROL_FLAG_MASK) > 0; + } + + @Override public int partitionLeaderEpoch() { return buffer.getInt(PARTITION_LEADER_EPOCH_OFFSET); } @@ -284,7 +290,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe if (timestampType() == timestampType && currentMaxTimestamp == maxTimestamp) return; - byte attributes = computeAttributes(compressionType(), timestampType, isTransactional()); + byte attributes = computeAttributes(compressionType(), timestampType, isTransactional(), isControlBatch()); buffer.putShort(ATTRIBUTES_OFFSET, attributes); buffer.putLong(MAX_TIMESTAMP_OFFSET, maxTimestamp); long crc = computeChecksum(); @@ -330,12 +336,15 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe return buffer != null ? buffer.hashCode() : 0; } - private static byte computeAttributes(CompressionType type, TimestampType timestampType, boolean isTransactional) { + private static byte computeAttributes(CompressionType type, TimestampType timestampType, + boolean isTransactional, boolean isControl) { if (timestampType == TimestampType.NO_TIMESTAMP_TYPE) throw new IllegalArgumentException("Timestamp type must be provided to compute attributes for message " + "format v2 and above"); byte attributes = isTransactional ? TRANSACTIONAL_FLAG_MASK : 0; + if (isControl) + attributes |= CONTROL_FLAG_MASK; if (type.id > 0) attributes |= COMPRESSION_CODEC_MASK & type.id; if (timestampType == TimestampType.LOG_APPEND_TIME) @@ -356,6 +365,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe short epoch, int sequence, boolean isTransactional, + boolean isControlBatch, int partitionLeaderEpoch, int numRecords) { if (magic < RecordBatch.CURRENT_MAGIC_VALUE) @@ -363,7 +373,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe if (baseTimestamp < 0 && baseTimestamp != NO_TIMESTAMP) throw new IllegalArgumentException("Invalid message timestamp " + baseTimestamp); - short attributes = computeAttributes(compressionType, timestampType, isTransactional); + short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch); int position = buffer.position(); buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset); http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java new file mode 100644 index 0000000..726b52a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/record/EndTransactionMarker.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.record; + +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; + +/** + * This class represents the control record which is written to the log to indicate the completion + * of a transaction. The record key specifies the {@link ControlRecordType control type} and the + * value embeds information useful for write validation (for now, just the coordinator epoch). + */ +public class EndTransactionMarker { + private static final Logger log = LoggerFactory.getLogger(EndTransactionMarker.class); + + private static final short CURRENT_END_TXN_MARKER_VERSION = 0; + private static final Schema END_TXN_MARKER_SCHEMA_VERSION_V0 = new Schema( + new Field("version", Type.INT16), + new Field("coordinator_epoch", Type.INT32)); + static final int CURRENT_END_TXN_MARKER_VALUE_SIZE = 6; + static final int CURRENT_END_TXN_SCHEMA_RECORD_SIZE = DefaultRecord.sizeInBytes(0, 0L, + ControlRecordType.CURRENT_CONTROL_RECORD_KEY_SIZE, + EndTransactionMarker.CURRENT_END_TXN_MARKER_VALUE_SIZE, + Record.EMPTY_HEADERS); + + private final ControlRecordType type; + private final int coordinatorEpoch; + + public EndTransactionMarker(ControlRecordType type, int coordinatorEpoch) { + ensureTransactionMarkerControlType(type); + this.type = type; + this.coordinatorEpoch = coordinatorEpoch; + } + + public int coordinatorEpoch() { + return coordinatorEpoch; + } + + public ControlRecordType controlType() { + return type; + } + + private Struct buildRecordValue() { + Struct struct = new Struct(END_TXN_MARKER_SCHEMA_VERSION_V0); + struct.set("version", CURRENT_END_TXN_MARKER_VERSION); + struct.set("coordinator_epoch", coordinatorEpoch); + return struct; + } + + public ByteBuffer serializeValue() { + Struct valueStruct = buildRecordValue(); + ByteBuffer value = ByteBuffer.allocate(valueStruct.sizeOf()); + valueStruct.writeTo(value); + value.flip(); + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + EndTransactionMarker that = (EndTransactionMarker) o; + return coordinatorEpoch == that.coordinatorEpoch && type == that.type; + } + + @Override + public int hashCode() { + int result = type != null ? type.hashCode() : 0; + result = 31 * result + coordinatorEpoch; + return result; + } + + private static void ensureTransactionMarkerControlType(ControlRecordType type) { + if (type != ControlRecordType.COMMIT && type != ControlRecordType.ABORT) + throw new IllegalArgumentException("Invalid control record type for end transaction marker" + type); + } + + public static EndTransactionMarker deserialize(Record record) { + ControlRecordType type = ControlRecordType.parse(record.key()); + return deserializeValue(type, record.value()); + } + + static EndTransactionMarker deserializeValue(ControlRecordType type, ByteBuffer value) { + ensureTransactionMarkerControlType(type); + + if (value.remaining() < CURRENT_END_TXN_MARKER_VALUE_SIZE) + throw new InvalidRecordException("Invalid value size found for end transaction marker. Must have " + + "at least " + CURRENT_END_TXN_MARKER_VALUE_SIZE + " bytes, but found only " + value.remaining()); + + short version = value.getShort(0); + if (version < 0) + throw new InvalidRecordException("Invalid version found for end transaction marker: " + version + + ". May indicate data corruption"); + + if (version > CURRENT_END_TXN_MARKER_VERSION) + log.debug("Received end transaction marker value version {}. Parsing as version {}", version, + CURRENT_END_TXN_MARKER_VERSION); + + int coordinatorEpoch = value.getInt(2); + return new EndTransactionMarker(type, coordinatorEpoch); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java index d5f10dc..1af5527 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java @@ -279,6 +279,12 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil } @Override + public boolean isControlBatch() { + loadUnderlyingRecordBatch(); + return underlying.isControlBatch(); + } + + @Override public int partitionLeaderEpoch() { loadUnderlyingRecordBatch(); return underlying.partitionLeaderEpoch(); http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index dcd7845..16d3777 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -224,7 +224,6 @@ public class FileRecords extends AbstractRecords implements Closeable { " size of this log segment is " + originalSize + " bytes."); if (targetSize < (int) channel.size()) { channel.truncate(targetSize); - channel.position(targetSize); size.set(targetSize); } return originalSize - targetSize; @@ -276,11 +275,11 @@ public class FileRecords extends AbstractRecords implements Closeable { * @param targetOffset The offset to search for. * @param startingPosition The starting position in the file to begin searching from. */ - public LogEntryPosition searchForOffsetWithSize(long targetOffset, int startingPosition) { + public LogOffsetPosition searchForOffsetWithSize(long targetOffset, int startingPosition) { for (FileChannelRecordBatch batch : batchesFrom(startingPosition)) { long offset = batch.lastOffset(); if (offset >= targetOffset) - return new LogEntryPosition(offset, batch.position(), batch.sizeInBytes()); + return new LogOffsetPosition(offset, batch.position(), batch.sizeInBytes()); } return null; } @@ -429,12 +428,12 @@ public class FileRecords extends AbstractRecords implements Closeable { } } - public static class LogEntryPosition { + public static class LogOffsetPosition { public final long offset; public final int position; public final int size; - public LogEntryPosition(long offset, int position, int size) { + public LogOffsetPosition(long offset, int position, int size) { this.offset = offset; this.position = position; this.size = size; @@ -447,7 +446,7 @@ public class FileRecords extends AbstractRecords implements Closeable { if (o == null || getClass() != o.getClass()) return false; - LogEntryPosition that = (LogEntryPosition) o; + LogOffsetPosition that = (LogOffsetPosition) o; return offset == that.offset && position == that.position && @@ -465,7 +464,7 @@ public class FileRecords extends AbstractRecords implements Closeable { @Override public String toString() { - return "LogEntryPosition(" + + return "LogOffsetPosition(" + "offset=" + offset + ", position=" + position + ", size=" + size + http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 548cd45..c8754c7 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -292,6 +292,16 @@ public class MemoryRecords extends AbstractRecords { return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, timestampType, baseOffset); } + public static MemoryRecordsBuilder idempotentBuilder(ByteBuffer buffer, + CompressionType compressionType, + long baseOffset, + long producerId, + short producerEpoch, + int baseSequence) { + return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, + baseOffset, System.currentTimeMillis(), producerId, producerEpoch, baseSequence); + } + public static MemoryRecordsBuilder builder(ByteBuffer buffer, byte magic, CompressionType compressionType, @@ -307,7 +317,8 @@ public class MemoryRecords extends AbstractRecords { long baseOffset, long logAppendTime) { return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, - RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH); + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, false, + RecordBatch.NO_PARTITION_LEADER_EPOCH); } public static MemoryRecordsBuilder builder(ByteBuffer buffer, @@ -320,7 +331,8 @@ public class MemoryRecords extends AbstractRecords { if (timestampType == TimestampType.LOG_APPEND_TIME) logAppendTime = System.currentTimeMillis(); return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime, - RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, isTransactional, RecordBatch.NO_PARTITION_LEADER_EPOCH); + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, isTransactional, + RecordBatch.NO_PARTITION_LEADER_EPOCH); } public static MemoryRecordsBuilder builder(ByteBuffer buffer, @@ -335,6 +347,18 @@ public class MemoryRecords extends AbstractRecords { } public static MemoryRecordsBuilder builder(ByteBuffer buffer, + CompressionType compressionType, + long baseOffset, + long producerId, + short producerEpoch, + int baseSequence, + boolean isTransactional) { + return builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME, baseOffset, + RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, baseSequence, isTransactional, + RecordBatch.NO_PARTITION_LEADER_EPOCH); + } + + public static MemoryRecordsBuilder builder(ByteBuffer buffer, byte magic, CompressionType compressionType, TimestampType timestampType, @@ -359,18 +383,18 @@ public class MemoryRecords extends AbstractRecords { boolean isTransactional, int partitionLeaderEpoch) { return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset, - logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, partitionLeaderEpoch, + logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false, partitionLeaderEpoch, buffer.remaining()); } - public static MemoryRecords withRecords(CompressionType compressionType, SimpleRecord... records) { return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, compressionType, records); } public static MemoryRecords withRecords(CompressionType compressionType, int partitionLeaderEpoch, SimpleRecord... records) { - return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compressionType, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, records); + return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compressionType, TimestampType.CREATE_TIME, + RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, + partitionLeaderEpoch, false, records); } public static MemoryRecords withRecords(byte magic, CompressionType compressionType, SimpleRecord... records) { @@ -378,30 +402,52 @@ public class MemoryRecords extends AbstractRecords { } public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, SimpleRecord... records) { - return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, records); + return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, + records); } public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, Integer partitionLeaderEpoch, SimpleRecord... records) { return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, records); + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch, false, records); } - public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType, - long producerId, short producerEpoch, int baseSequence, - int partitionLeaderEpoch, SimpleRecord... records) { + public static MemoryRecords withIdempotentRecords(byte magic, long initialOffset, CompressionType compressionType, + long producerId, short producerEpoch, int baseSequence, + int partitionLeaderEpoch, SimpleRecord... records) { return withRecords(magic, initialOffset, compressionType, TimestampType.CREATE_TIME, producerId, producerEpoch, - baseSequence, partitionLeaderEpoch, records); + baseSequence, partitionLeaderEpoch, false, records); + } + + public static MemoryRecords withIdempotentRecords(long initialOffset, CompressionType compressionType, long producerId, + short producerEpoch, int baseSequence, int partitionLeaderEpoch, + SimpleRecord... records) { + return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, + producerId, producerEpoch, baseSequence, partitionLeaderEpoch, false, records); + } + + public static MemoryRecords withTransactionalRecords(CompressionType compressionType, long producerId, + short producerEpoch, int baseSequence, SimpleRecord... records) { + return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, compressionType, TimestampType.CREATE_TIME, + producerId, producerEpoch, baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH, true, records); + } + + public static MemoryRecords withTransactionalRecords(long initialOffset, CompressionType compressionType, long producerId, + short producerEpoch, int baseSequence, SimpleRecord... records) { + return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, + producerId, producerEpoch, baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH, true, records); } public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType, TimestampType timestampType, SimpleRecord... records) { return withRecords(magic, initialOffset, compressionType, timestampType, RecordBatch.NO_PRODUCER_ID, - RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.NO_PARTITION_LEADER_EPOCH, records); + RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.NO_PARTITION_LEADER_EPOCH, + false, records); } - private static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType, - TimestampType timestampType, long producerId, short producerEpoch, - int baseSequence, int partitionLeaderEpoch, SimpleRecord ... records) { + public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType, + TimestampType timestampType, long producerId, short producerEpoch, + int baseSequence, int partitionLeaderEpoch, boolean isTransactional, + SimpleRecord ... records) { if (records.length == 0) return MemoryRecords.EMPTY; int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records)); @@ -409,11 +455,38 @@ public class MemoryRecords extends AbstractRecords { long logAppendTime = RecordBatch.NO_TIMESTAMP; if (timestampType == TimestampType.LOG_APPEND_TIME) logAppendTime = System.currentTimeMillis(); - MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset, - logAppendTime, producerId, producerEpoch, baseSequence, false, partitionLeaderEpoch); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, + initialOffset, logAppendTime, producerId, producerEpoch, baseSequence, isTransactional, false, + partitionLeaderEpoch, buffer.capacity()); for (SimpleRecord record : records) builder.append(record); return builder.build(); } + public static MemoryRecords withEndTransactionMarker(long producerId, short producerEpoch, EndTransactionMarker marker) { + return withEndTransactionMarker(0L, producerId, producerEpoch, marker); + } + + public static MemoryRecords withEndTransactionMarker(long initialOffset, long producerId, short producerEpoch, + EndTransactionMarker marker) { + int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD + + EndTransactionMarker.CURRENT_END_TXN_SCHEMA_RECORD_SIZE; + ByteBuffer buffer = ByteBuffer.allocate(endTxnMarkerBatchSize); + writeEndTransactionalMarker(buffer, initialOffset, producerId, producerEpoch, marker); + buffer.flip(); + return MemoryRecords.readableRecords(buffer); + } + + public static void writeEndTransactionalMarker(ByteBuffer buffer, long initialOffset, long producerId, + short producerEpoch, EndTransactionMarker marker) { + boolean isTransactional = true; + boolean isControlBatch = true; + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE, + TimestampType.CREATE_TIME, initialOffset, RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, + RecordBatch.NO_SEQUENCE, isTransactional, isControlBatch, RecordBatch.NO_PARTITION_LEADER_EPOCH, + buffer.capacity()); + builder.appendEndTxnMarker(System.currentTimeMillis(), marker); + builder.close(); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index b9d65a5..f7451cf 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -66,6 +66,7 @@ public class MemoryRecordsBuilder { private final long baseOffset; private final long logAppendTime; private final boolean isTransactional; + private final boolean isControlBatch; private final int partitionLeaderEpoch; private final int writeLimit; private final int initialCapacity; @@ -112,17 +113,18 @@ public class MemoryRecordsBuilder { short producerEpoch, int baseSequence, boolean isTransactional, + boolean isControlBatch, int partitionLeaderEpoch, int writeLimit) { if (magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.NO_TIMESTAMP_TYPE) throw new IllegalArgumentException("TimestampType must be set for magic >= 0"); - - if (isTransactional) { - if (magic < RecordBatch.MAGIC_VALUE_V2) - throw new IllegalArgumentException("Transactional messages are not supported for magic " + magic); + if (magic < RecordBatch.MAGIC_VALUE_V2) { + if (isTransactional) + throw new IllegalArgumentException("Transactional records are not supported for magic " + magic); + if (isControlBatch) + throw new IllegalArgumentException("Control records are not supported for magic " + magic); } - this.magic = magic; this.timestampType = timestampType; this.compressionType = compressionType; @@ -137,6 +139,7 @@ public class MemoryRecordsBuilder { this.producerEpoch = producerEpoch; this.baseSequence = baseSequence; this.isTransactional = isTransactional; + this.isControlBatch = isControlBatch; this.partitionLeaderEpoch = partitionLeaderEpoch; this.writeLimit = writeLimit; this.initialCapacity = buffer.capacity(); @@ -254,7 +257,7 @@ public class MemoryRecordsBuilder { if (producerEpoch == RecordBatch.NO_PRODUCER_EPOCH) throw new IllegalArgumentException("Invalid negative producer epoch"); - if (baseSequence == RecordBatch.NO_SEQUENCE) + if (baseSequence < 0 && !isControlBatch) throw new IllegalArgumentException("Invalid negative sequence number used"); if (magic < RecordBatch.MAGIC_VALUE_V2) @@ -298,7 +301,7 @@ public class MemoryRecordsBuilder { } DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType, - baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, + baseTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, numRecords); buffer.position(pos); @@ -326,26 +329,26 @@ public class MemoryRecordsBuilder { } private long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key, - ByteBuffer value, Header[] headers) { + ByteBuffer value, Header[] headers) { try { + if (isControlRecord != isControlBatch) + throw new IllegalArgumentException("Control records can only be appended to control batches"); + if (lastOffset != null && offset <= lastOffset) - throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s (Offsets must increase monotonically).", offset, lastOffset)); + throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " + + "(Offsets must increase monotonically).", offset, lastOffset)); if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP) throw new IllegalArgumentException("Invalid negative timestamp " + timestamp); - if (magic < RecordBatch.MAGIC_VALUE_V2) { - if (isControlRecord) - throw new IllegalArgumentException("Magic v" + magic + " does not support control records"); - if (headers != null && headers.length > 0) - throw new IllegalArgumentException("Magic v" + magic + " does not support record headers"); - } + if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) + throw new IllegalArgumentException("Magic v" + magic + " does not support record headers"); if (baseTimestamp == null) baseTimestamp = timestamp; if (magic > RecordBatch.MAGIC_VALUE_V1) - return appendDefaultRecord(offset, isControlRecord, timestamp, key, value, headers); + return appendDefaultRecord(offset, timestamp, key, value, headers); else return appendLegacyRecord(offset, timestamp, key, value); } catch (IOException e) { @@ -388,7 +391,7 @@ public class MemoryRecordsBuilder { * @return crc of the record */ public long appendWithOffset(long offset, long timestamp, byte[] key, byte[] value) { - return appendWithOffset(offset, false, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS); + return appendWithOffset(offset, timestamp, wrapNullable(key), wrapNullable(value), Record.EMPTY_HEADERS); } /** @@ -400,7 +403,7 @@ public class MemoryRecordsBuilder { * @return crc of the record */ public long appendWithOffset(long offset, long timestamp, ByteBuffer key, ByteBuffer value) { - return appendWithOffset(offset, false, timestamp, key, value, Record.EMPTY_HEADERS); + return appendWithOffset(offset, timestamp, key, value, Record.EMPTY_HEADERS); } /** @@ -410,7 +413,7 @@ public class MemoryRecordsBuilder { * @return crc of the record */ public long appendWithOffset(long offset, SimpleRecord record) { - return appendWithOffset(offset, false, record.timestamp(), record.key(), record.value(), record.headers()); + return appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers()); } @@ -434,7 +437,7 @@ public class MemoryRecordsBuilder { * @return crc of the record */ public long append(long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) { - return appendWithOffset(nextSequentialOffset(), false, timestamp, key, value, headers); + return appendWithOffset(nextSequentialOffset(), timestamp, key, value, headers); } /** @@ -476,7 +479,7 @@ public class MemoryRecordsBuilder { * @param value The control record value * @return crc of the record */ - public long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) { + private long appendControlRecord(long timestamp, ControlRecordType type, ByteBuffer value) { Struct keyStruct = type.recordKey(); ByteBuffer key = ByteBuffer.allocate(keyStruct.sizeOf()); keyStruct.writeTo(key); @@ -484,6 +487,15 @@ public class MemoryRecordsBuilder { return appendWithOffset(nextSequentialOffset(), true, timestamp, key, value, Record.EMPTY_HEADERS); } + public long appendEndTxnMarker(long timestamp, EndTransactionMarker marker) { + if (producerId == RecordBatch.NO_PRODUCER_ID) + throw new IllegalArgumentException("End transaction marker requires a valid producerId"); + if (!isTransactional) + throw new IllegalArgumentException("End transaction marker depends on batch transactional flag being enabled"); + ByteBuffer value = marker.serializeValue(); + return appendControlRecord(timestamp, marker.controlType(), value); + } + /** * Add a legacy record without doing offset/magic validation (this should only be used in testing). * @param offset The offset of the record @@ -509,8 +521,7 @@ public class MemoryRecordsBuilder { * @param record the record to add */ public void append(Record record) { - appendWithOffset(record.offset(), record.isControlRecord(), record.timestamp(), record.key(), record.value(), - record.headers()); + appendWithOffset(record.offset(), record.timestamp(), record.key(), record.value(), record.headers()); } /** @@ -519,8 +530,7 @@ public class MemoryRecordsBuilder { * @param record The record to add */ public void appendWithOffset(long offset, Record record) { - appendWithOffset(offset, record.isControlRecord(), record.timestamp(), record.key(), record.value(), - record.headers()); + appendWithOffset(offset, record.timestamp(), record.key(), record.value(), record.headers()); } /** @@ -542,12 +552,12 @@ public class MemoryRecordsBuilder { appendWithOffset(nextSequentialOffset(), record); } - private long appendDefaultRecord(long offset, boolean isControlRecord, long timestamp, - ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException { + private long appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, + Header[] headers) throws IOException { ensureOpenForRecordAppend(); int offsetDelta = (int) (offset - baseOffset); long timestampDelta = timestamp - baseTimestamp; - long crc = DefaultRecord.writeTo(appendStream, isControlRecord, offsetDelta, timestampDelta, key, value, headers); + long crc = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); // TODO: The crc is useless for the new message format. Maybe we should let writeTo return the written size? recordWritten(offset, timestamp, DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, key, value, headers)); return crc; http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/Record.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java index fdf41b3..cba6fc5 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Record.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java @@ -133,15 +133,6 @@ public interface Record { boolean hasTimestampType(TimestampType timestampType); /** - * Check whether this is a control record (i.e. whether the control bit is set in the record attributes). - * For magic versions prior to 2, this is always false. - * - * @return Whether this is a control record - */ - boolean isControlRecord(); - - - /** * Get the headers. For magic versions 1 and below, this always returns an empty array. * * @return the array of headers http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java index 4fd03e1..c984deb 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java @@ -216,4 +216,12 @@ public interface RecordBatch extends Iterable<Record> { * @return The closeable iterator */ CloseableIterator<Record> streamingIterator(); + + /** + * Check whether this is a control batch (i.e. whether the control bit is set in the batch attributes). + * For magic versions prior to 2, this is always false. + * + * @return Whether this is a batch containing control records + */ + boolean isControlBatch(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 7dbffd1..03f6ee5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -40,6 +40,7 @@ public class ListOffsetRequest extends AbstractRequest { public static final int DEBUGGING_REPLICA_ID = -2; private static final String REPLICA_ID_KEY_NAME = "replica_id"; + private static final String ISOLATION_LEVEL_KEY_NAME = "isolation_level"; private static final String TOPICS_KEY_NAME = "topics"; // topic level field names @@ -52,6 +53,7 @@ public class ListOffsetRequest extends AbstractRequest { private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets"; private final int replicaId; + private final IsolationLevel isolationLevel; private final Map<TopicPartition, PartitionData> offsetData; private final Map<TopicPartition, Long> partitionTimestamps; private final Set<TopicPartition> duplicatePartitions; @@ -59,23 +61,29 @@ public class ListOffsetRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder<ListOffsetRequest> { private final int replicaId; private final short minVersion; + private final IsolationLevel isolationLevel; private Map<TopicPartition, PartitionData> offsetData = null; private Map<TopicPartition, Long> partitionTimestamps = null; public static Builder forReplica(short desiredVersion, int replicaId) { - return new Builder((short) 0, desiredVersion, replicaId); + return new Builder((short) 0, desiredVersion, replicaId, IsolationLevel.READ_UNCOMMITTED); } - public static Builder forConsumer(boolean requireTimestamp) { + public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) { // If we need a timestamp in the response, the minimum RPC version we can send is v1. Otherwise, v0 is OK. - short minVersion = requireTimestamp ? (short) 1 : (short) 0; - return new Builder(minVersion, null, CONSUMER_REPLICA_ID); + short minVersion = 0; + if (isolationLevel == IsolationLevel.READ_COMMITTED) + minVersion = 2; + else if (requireTimestamp) + minVersion = 1; + return new Builder(minVersion, null, CONSUMER_REPLICA_ID, isolationLevel); } - private Builder(short minVersion, Short desiredVersion, int replicaId) { + private Builder(short minVersion, Short desiredVersion, int replicaId, IsolationLevel isolationLevel) { super(ApiKeys.LIST_OFFSETS, desiredVersion); this.minVersion = minVersion; this.replicaId = replicaId; + this.isolationLevel = isolationLevel; } public Builder setOffsetData(Map<TopicPartition, PartitionData> offsetData) { @@ -118,7 +126,7 @@ public class ListOffsetRequest extends AbstractRequest { } } Map<TopicPartition, ?> m = (version == 0) ? offsetData : partitionTimestamps; - return new ListOffsetRequest(replicaId, m, version); + return new ListOffsetRequest(replicaId, m, isolationLevel, version); } @Override @@ -165,9 +173,10 @@ public class ListOffsetRequest extends AbstractRequest { * Private constructor with a specified version. */ @SuppressWarnings("unchecked") - private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> targetTimes, short version) { + private ListOffsetRequest(int replicaId, Map<TopicPartition, ?> targetTimes, IsolationLevel isolationLevel, short version) { super(version); this.replicaId = replicaId; + this.isolationLevel = isolationLevel; this.offsetData = version == 0 ? (Map<TopicPartition, PartitionData>) targetTimes : null; this.partitionTimestamps = version >= 1 ? (Map<TopicPartition, Long>) targetTimes : null; this.duplicatePartitions = Collections.emptySet(); @@ -177,6 +186,9 @@ public class ListOffsetRequest extends AbstractRequest { super(version); Set<TopicPartition> duplicatePartitions = new HashSet<>(); replicaId = struct.getInt(REPLICA_ID_KEY_NAME); + isolationLevel = struct.hasField(ISOLATION_LEVEL_KEY_NAME) ? + IsolationLevel.forId(struct.getByte(ISOLATION_LEVEL_KEY_NAME)) : + IsolationLevel.READ_UNCOMMITTED; offsetData = new HashMap<>(); partitionTimestamps = new HashMap<>(); for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) { @@ -223,7 +235,6 @@ public class ListOffsetRequest extends AbstractRequest { switch (versionId) { case 0: case 1: - return new ListOffsetResponse(responseData); case 2: return new ListOffsetResponse(throttleTimeMs, responseData); default: @@ -236,6 +247,10 @@ public class ListOffsetRequest extends AbstractRequest { return replicaId; } + public IsolationLevel isolationLevel() { + return isolationLevel; + } + @Deprecated public Map<TopicPartition, PartitionData> offsetData() { return offsetData; @@ -262,6 +277,9 @@ public class ListOffsetRequest extends AbstractRequest { Map<String, Map<Integer, Object>> topicsData = CollectionUtils.groupDataByTopic(targetTimes); struct.set(REPLICA_ID_KEY_NAME, replicaId); + + if (struct.hasField(ISOLATION_LEVEL_KEY_NAME)) + struct.set(ISOLATION_LEVEL_KEY_NAME, isolationLevel.id()); List<Struct> topicArray = new ArrayList<>(); for (Map.Entry<String, Map<Integer, Object>> topicEntry: topicsData.entrySet()) { Struct topicData = struct.instance(TOPICS_KEY_NAME); http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index cb0ff89..24ee788 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -762,6 +762,11 @@ public class Utils { } while (bytesRead != -1 && destinationBuffer.hasRemaining()); } + public static void writeFully(FileChannel channel, ByteBuffer sourceBuffer) throws IOException { + while (sourceBuffer.hasRemaining()) + channel.write(sourceBuffer); + } + /** * Write the contents of a buffer to an output stream. The bytes are copied from the current position * in the buffer.
