This is an automated email from the ASF dual-hosted git repository. mimaison pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 430e00e KAFKA-8436: use automated protocol for AddOffsetsToTxn (#7015) 430e00e is described below commit 430e00ea95da959d6d8308dd49c4ed1cdffa7914 Author: Boyang Chen <boy...@confluent.io> AuthorDate: Sat Apr 11 04:54:53 2020 -0700 KAFKA-8436: use automated protocol for AddOffsetsToTxn (#7015) Reviewers: Mickael Maison <mickael.mai...@gmail.com> --- .../producer/internals/TransactionManager.java | 18 +++-- .../org/apache/kafka/common/protocol/ApiKeys.java | 8 +- .../kafka/common/requests/AbstractResponse.java | 2 +- .../common/requests/AddOffsetsToTxnRequest.java | 94 ++++------------------ .../common/requests/AddOffsetsToTxnResponse.java | 78 ++++++------------ .../kafka/clients/producer/KafkaProducerTest.java | 5 +- .../producer/internals/TransactionManagerTest.java | 14 ++-- .../kafka/common/requests/RequestResponseTest.java | 14 +++- core/src/main/scala/kafka/server/KafkaApis.scala | 24 ++++-- .../kafka/api/AuthorizerIntegrationTest.scala | 11 ++- .../scala/unit/kafka/server/RequestQuotaTest.scala | 8 +- 11 files changed, 116 insertions(+), 160 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index c432e50..fa473be 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -40,6 +40,7 @@ import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.AddOffsetsToTxnRequestData; import org.apache.kafka.common.message.EndTxnRequestData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.InitProducerIdRequestData; @@ -395,9 +396,15 @@ public class TransactionManager { "active transaction"); log.debug("Begin adding offsets {} for consumer group {} to transaction", offsets, groupMetadata); - AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder(transactionalId, - producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, groupMetadata.groupId()); + AddOffsetsToTxnRequest.Builder builder = new AddOffsetsToTxnRequest.Builder( + new AddOffsetsToTxnRequestData() + .setTransactionalId(transactionalId) + .setProducerId(producerIdAndEpoch.producerId) + .setProducerEpoch(producerIdAndEpoch.epoch) + .setGroupId(groupMetadata.groupId()) + ); AddOffsetsToTxnHandler handler = new AddOffsetsToTxnHandler(builder, offsets, groupMetadata); + enqueueRequest(handler); return handler.result; } @@ -1610,13 +1617,14 @@ public class TransactionManager { @Override public void handleResponse(AbstractResponse response) { AddOffsetsToTxnResponse addOffsetsToTxnResponse = (AddOffsetsToTxnResponse) response; - Errors error = addOffsetsToTxnResponse.error(); + Errors error = Errors.forCode(addOffsetsToTxnResponse.data.errorCode()); if (error == Errors.NONE) { - log.debug("Successfully added partition for consumer group {} to transaction", builder.consumerGroupId()); + log.debug("Successfully added partition for consumer group {} to transaction", builder.data.groupId()); // note the result is not completed until the TxnOffsetCommit returns pendingRequests.add(txnOffsetCommitHandler(result, offsets, groupMetadata)); + transactionStarted = true; } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { lookupCoordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId); @@ -1630,7 +1638,7 @@ public class TransactionManager { } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { fatalError(error.exception()); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { - abortableError(GroupAuthorizationException.forGroupId(builder.consumerGroupId())); + abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId())); } else { fatalError(new KafkaException("Unexpected error in AddOffsetsToTxnResponse: " + error.message())); } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 80d9383..411f337 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -19,6 +19,8 @@ package org.apache.kafka.common.protocol; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; import org.apache.kafka.common.message.ApiMessageType; +import org.apache.kafka.common.message.AddOffsetsToTxnRequestData; +import org.apache.kafka.common.message.AddOffsetsToTxnResponseData; import org.apache.kafka.common.message.ApiVersionsRequestData; import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.AlterClientQuotasRequestData; @@ -106,8 +108,6 @@ import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.protocol.types.Type; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.requests.AddOffsetsToTxnRequest; -import org.apache.kafka.common.requests.AddOffsetsToTxnResponse; import org.apache.kafka.common.requests.AddPartitionsToTxnRequest; import org.apache.kafka.common.requests.AddPartitionsToTxnResponse; import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest; @@ -174,8 +174,8 @@ public enum ApiKeys { OffsetsForLeaderEpochResponse.schemaVersions()), ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddPartitionsToTxnRequest.schemaVersions(), AddPartitionsToTxnResponse.schemaVersions()), - ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequest.schemaVersions(), - AddOffsetsToTxnResponse.schemaVersions()), + ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequestData.SCHEMAS, + AddOffsetsToTxnResponseData.SCHEMAS), END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2, EndTxnRequestData.SCHEMAS, EndTxnResponseData.SCHEMAS), WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequestData.SCHEMAS, WriteTxnMarkersResponseData.SCHEMAS), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 7f2f4bc..dcd5439 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -131,7 +131,7 @@ public abstract class AbstractResponse implements AbstractRequestResponse { case ADD_PARTITIONS_TO_TXN: return new AddPartitionsToTxnResponse(struct); case ADD_OFFSETS_TO_TXN: - return new AddOffsetsToTxnResponse(struct); + return new AddOffsetsToTxnResponse(struct, version); case END_TXN: return new EndTxnResponse(struct, version); case WRITE_TXN_MARKERS: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java index 2668ae1..3b1c746 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java @@ -16,124 +16,60 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.AddOffsetsToTxnRequestData; +import org.apache.kafka.common.message.AddOffsetsToTxnResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import static org.apache.kafka.common.protocol.CommonFields.GROUP_ID; -import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH; -import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID; -import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID; - public class AddOffsetsToTxnRequest extends AbstractRequest { - private static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V0 = new Schema( - TRANSACTIONAL_ID, - PRODUCER_ID, - PRODUCER_EPOCH, - GROUP_ID); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema ADD_OFFSETS_TO_TXN_REQUEST_V1 = ADD_OFFSETS_TO_TXN_REQUEST_V0; - public static Schema[] schemaVersions() { - return new Schema[]{ADD_OFFSETS_TO_TXN_REQUEST_V0, ADD_OFFSETS_TO_TXN_REQUEST_V1}; - } + public AddOffsetsToTxnRequestData data; public static class Builder extends AbstractRequest.Builder<AddOffsetsToTxnRequest> { - private final String transactionalId; - private final long producerId; - private final short producerEpoch; - private final String consumerGroupId; + public AddOffsetsToTxnRequestData data; - public Builder(String transactionalId, long producerId, short producerEpoch, String consumerGroupId) { + public Builder(AddOffsetsToTxnRequestData data) { super(ApiKeys.ADD_OFFSETS_TO_TXN); - this.transactionalId = transactionalId; - this.producerId = producerId; - this.producerEpoch = producerEpoch; - this.consumerGroupId = consumerGroupId; - } - - public String consumerGroupId() { - return consumerGroupId; + this.data = data; } @Override public AddOffsetsToTxnRequest build(short version) { - return new AddOffsetsToTxnRequest(version, transactionalId, producerId, producerEpoch, consumerGroupId); + return new AddOffsetsToTxnRequest(data, version); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type=AddOffsetsToTxnRequest"). - append(", transactionalId=").append(transactionalId). - append(", producerId=").append(producerId). - append(", producerEpoch=").append(producerEpoch). - append(", consumerGroupId=").append(consumerGroupId). - append(")"); - return bld.toString(); + return data.toString(); } } - private final String transactionalId; - private final long producerId; - private final short producerEpoch; - private final String consumerGroupId; - - private AddOffsetsToTxnRequest(short version, String transactionalId, long producerId, short producerEpoch, String consumerGroupId) { + public AddOffsetsToTxnRequest(AddOffsetsToTxnRequestData data, short version) { super(ApiKeys.ADD_OFFSETS_TO_TXN, version); - this.transactionalId = transactionalId; - this.producerId = producerId; - this.producerEpoch = producerEpoch; - this.consumerGroupId = consumerGroupId; + this.data = data; } public AddOffsetsToTxnRequest(Struct struct, short version) { super(ApiKeys.ADD_OFFSETS_TO_TXN, version); - this.transactionalId = struct.get(TRANSACTIONAL_ID); - this.producerId = struct.get(PRODUCER_ID); - this.producerEpoch = struct.get(PRODUCER_EPOCH); - this.consumerGroupId = struct.get(GROUP_ID); - } - - public String transactionalId() { - return transactionalId; - } - - public long producerId() { - return producerId; - } - - public short producerEpoch() { - return producerEpoch; - } - - public String consumerGroupId() { - return consumerGroupId; + this.data = new AddOffsetsToTxnRequestData(struct, version); } @Override protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.requestSchema(version())); - struct.set(TRANSACTIONAL_ID, transactionalId); - struct.set(PRODUCER_ID, producerId); - struct.set(PRODUCER_EPOCH, producerEpoch); - struct.set(GROUP_ID, consumerGroupId); - return struct; + return data.toStruct(version()); } @Override public AddOffsetsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return new AddOffsetsToTxnResponse(throttleTimeMs, Errors.forException(e)); + return new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData() + .setErrorCode(Errors.forException(e).code()) + .setThrottleTimeMs(throttleTimeMs)); } public static AddOffsetsToTxnRequest parse(ByteBuffer buffer, short version) { return new AddOffsetsToTxnRequest(ApiKeys.ADD_OFFSETS_TO_TXN.parseRequest(version, buffer), version); } - } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java index 867ca6a..3747bb9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java @@ -16,86 +16,60 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.AddOffsetsToTxnResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; - +/** + * Possible error codes: + * + * - {@link Errors#NOT_COORDINATOR} + * - {@link Errors#COORDINATOR_NOT_AVAILABLE} + * - {@link Errors#COORDINATOR_LOAD_IN_PROGRESS} + * - {@link Errors#INVALID_PRODUCER_ID_MAPPING} + * - {@link Errors#INVALID_PRODUCER_EPOCH} + * - {@link Errors#INVALID_TXN_STATE} + * - {@link Errors#GROUP_AUTHORIZATION_FAILED} + * - {@link Errors#TRANSACTIONAL_ID_AUTHORIZATION_FAILED} + */ public class AddOffsetsToTxnResponse extends AbstractResponse { - private static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema( - THROTTLE_TIME_MS, - ERROR_CODE); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V1 = ADD_OFFSETS_TO_TXN_RESPONSE_V0; - public static Schema[] schemaVersions() { - return new Schema[]{ADD_OFFSETS_TO_TXN_RESPONSE_V0, ADD_OFFSETS_TO_TXN_RESPONSE_V1}; - } - - // Possible error codes: - // NotCoordinator - // CoordinatorNotAvailable - // CoordinatorLoadInProgress - // InvalidProducerIdMapping - // InvalidProducerEpoch - // InvalidTxnState - // GroupAuthorizationFailed - // TransactionalIdAuthorizationFailed + public AddOffsetsToTxnResponseData data; - private final Errors error; - private final int throttleTimeMs; - - public AddOffsetsToTxnResponse(int throttleTimeMs, Errors error) { - this.throttleTimeMs = throttleTimeMs; - this.error = error; + public AddOffsetsToTxnResponse(AddOffsetsToTxnResponseData data) { + this.data = data; } - public AddOffsetsToTxnResponse(Struct struct) { - this.throttleTimeMs = struct.get(THROTTLE_TIME_MS); - this.error = Errors.forCode(struct.get(ERROR_CODE)); + public AddOffsetsToTxnResponse(Struct struct, short version) { + this.data = new AddOffsetsToTxnResponseData(struct, version); } @Override - public int throttleTimeMs() { - return throttleTimeMs; - } - - public Errors error() { - return error; + public Map<Errors, Integer> errorCounts() { + return errorCounts(Errors.forCode(data.errorCode())); } @Override - public Map<Errors, Integer> errorCounts() { - return errorCounts(error); + protected Struct toStruct(short version) { + return data.toStruct(version); } @Override - protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version)); - struct.set(THROTTLE_TIME_MS, throttleTimeMs); - struct.set(ERROR_CODE, error.code()); - return struct; + public int throttleTimeMs() { + return data.throttleTimeMs(); } public static AddOffsetsToTxnResponse parse(ByteBuffer buffer, short version) { - return new AddOffsetsToTxnResponse(ApiKeys.ADD_OFFSETS_TO_TXN.parseResponse(version, buffer)); + return new AddOffsetsToTxnResponse(ApiKeys.ADD_OFFSETS_TO_TXN.parseResponse(version, buffer), version); } @Override public String toString() { - return "AddOffsetsToTxnResponse(" + - "error=" + error + - ", throttleTimeMs=" + throttleTimeMs + - ')'; + return data.toString(); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index 8fec21c..280faa1 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.AddOffsetsToTxnResponseData; import org.apache.kafka.common.message.EndTxnResponseData; import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.message.TxnOffsetCommitRequestData; @@ -959,7 +960,9 @@ public class KafkaProducerTest { } private AddOffsetsToTxnResponse addOffsetsToTxnResponse(Errors error) { - return new AddOffsetsToTxnResponse(10, error); + return new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData() + .setErrorCode(error.code()) + .setThrottleTimeMs(10)); } private TxnOffsetCommitResponse txnOffsetsCommitResponse(Map<TopicPartition, Errors> errorMap) { diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index c700605..5650110 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -42,6 +42,7 @@ import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.internals.ClusterResourceListeners; +import org.apache.kafka.common.message.AddOffsetsToTxnResponseData; import org.apache.kafka.common.message.EndTxnResponseData; import org.apache.kafka.common.message.InitProducerIdResponseData; import org.apache.kafka.common.metrics.Metrics; @@ -3258,12 +3259,15 @@ public class TransactionManagerTest { final short producerEpoch) { client.prepareResponse(body -> { AddOffsetsToTxnRequest addOffsetsToTxnRequest = (AddOffsetsToTxnRequest) body; - assertEquals(consumerGroupId, addOffsetsToTxnRequest.consumerGroupId()); - assertEquals(transactionalId, addOffsetsToTxnRequest.transactionalId()); - assertEquals(producerId, addOffsetsToTxnRequest.producerId()); - assertEquals(producerEpoch, addOffsetsToTxnRequest.producerEpoch()); + assertEquals(consumerGroupId, addOffsetsToTxnRequest.data.groupId()); + assertEquals(transactionalId, addOffsetsToTxnRequest.data.transactionalId()); + assertEquals(producerId, addOffsetsToTxnRequest.data.producerId()); + assertEquals(producerEpoch, addOffsetsToTxnRequest.data.producerEpoch()); return true; - }, new AddOffsetsToTxnResponse(0, error)); + }, new AddOffsetsToTxnResponse( + new AddOffsetsToTxnResponseData() + .setErrorCode(error.code())) + ); } private void prepareTxnOffsetCommitResponse(final String consumerGroupId, diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 7a6ce7f..0f9ab8c 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -33,6 +33,8 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.AddOffsetsToTxnRequestData; +import org.apache.kafka.common.message.AddOffsetsToTxnResponseData; import org.apache.kafka.common.message.AlterConfigsResponseData; import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData; import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData; @@ -1632,11 +1634,19 @@ public class RequestResponseTest { } private AddOffsetsToTxnRequest createAddOffsetsToTxnRequest() { - return new AddOffsetsToTxnRequest.Builder("tid", 21L, (short) 42, "gid").build(); + return new AddOffsetsToTxnRequest.Builder( + new AddOffsetsToTxnRequestData() + .setTransactionalId("tid") + .setProducerId(21L) + .setProducerEpoch((short) 42) + .setGroupId("gid") + ).build(); } private AddOffsetsToTxnResponse createAddOffsetsToTxnResponse() { - return new AddOffsetsToTxnResponse(0, Errors.NONE); + return new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData() + .setErrorCode(Errors.NONE.code()) + .setThrottleTimeMs(0)); } private EndTxnRequest createEndTxnRequest() { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 43ab2c5..57ce87c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -51,7 +51,7 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS import org.apache.kafka.common.message.AlterConfigsResponseData.AlterConfigsResourceResponse import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult -import org.apache.kafka.common.message.{AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, HeartbeatResponseData, InitProducer [...] +import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeGroupsResponseData, DescribeLogDirsResponseData, EndTxnResponseData, ExpireDelegationTokenResponseData, FindCoordinatorResponseData, Heartb [...] import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultCollection} import org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult, DeletableGroupResultCollection} import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse, ReassignableTopicResponse} @@ -2154,20 +2154,28 @@ class KafkaApis(val requestChannel: RequestChannel, def handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit = { ensureInterBrokerVersion(KAFKA_0_11_0_IV0) val addOffsetsToTxnRequest = request.body[AddOffsetsToTxnRequest] - val transactionalId = addOffsetsToTxnRequest.transactionalId - val groupId = addOffsetsToTxnRequest.consumerGroupId + val transactionalId = addOffsetsToTxnRequest.data.transactionalId + val groupId = addOffsetsToTxnRequest.data.groupId val offsetTopicPartition = new TopicPartition(GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) if (!authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) sendResponseMaybeThrottle(request, requestThrottleMs => - new AddOffsetsToTxnResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED)) + new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData() + .setErrorCode(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.code) + .setThrottleTimeMs(requestThrottleMs))) else if (!authorize(request.context, READ, GROUP, groupId)) sendResponseMaybeThrottle(request, requestThrottleMs => - new AddOffsetsToTxnResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)) + new AddOffsetsToTxnResponse(new AddOffsetsToTxnResponseData() + .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code) + .setThrottleTimeMs(requestThrottleMs)) + ) else { def sendResponseCallback(error: Errors): Unit = { def createResponse(requestThrottleMs: Int): AbstractResponse = { - val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse(requestThrottleMs, error) + val responseBody: AddOffsetsToTxnResponse = new AddOffsetsToTxnResponse( + new AddOffsetsToTxnResponseData() + .setErrorCode(error.code) + .setThrottleTimeMs(requestThrottleMs)) trace(s"Completed $transactionalId's AddOffsetsToTxnRequest for group $groupId on partition " + s"$offsetTopicPartition: errors: $error from client ${request.header.clientId}") responseBody @@ -2176,8 +2184,8 @@ class KafkaApis(val requestChannel: RequestChannel, } txnCoordinator.handleAddPartitionsToTransaction(transactionalId, - addOffsetsToTxnRequest.producerId, - addOffsetsToTxnRequest.producerEpoch, + addOffsetsToTxnRequest.data.producerId, + addOffsetsToTxnRequest.data.producerEpoch, Set(offsetTopicPartition), sendResponseCallback) } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 1b57cff..9a7ec14 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -35,6 +35,7 @@ import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME +import org.apache.kafka.common.message.AddOffsetsToTxnRequestData import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterableConfig, AlterableConfigCollection} @@ -184,7 +185,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.INIT_PRODUCER_ID -> ((resp: InitProducerIdResponse) => resp.error), ApiKeys.WRITE_TXN_MARKERS -> ((resp: WriteTxnMarkersResponse) => resp.errors(producerId).get(tp)), ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)), - ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => resp.error), + ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => Errors.forCode(resp.data.errorCode)), ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error), ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp)), ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => Errors.forCode(resp.results.asScala.head.errorCode)), @@ -533,7 +534,13 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def addPartitionsToTxnRequest = new AddPartitionsToTxnRequest.Builder(transactionalId, 1, 1, Collections.singletonList(tp)).build() - private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build() + private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder( + new AddOffsetsToTxnRequestData() + .setTransactionalId(transactionalId) + .setProducerId(1) + .setProducerEpoch(1) + .setGroupId(group) + ).build() private def electLeadersRequest = new ElectLeadersRequest.Builder( ElectionType.PREFERRED, diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 42220a8..c47d511 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -24,6 +24,7 @@ import kafka.security.authorizer.AclAuthorizer import kafka.utils.TestUtils import org.apache.kafka.common.acl._ import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.message.AddOffsetsToTxnRequestData import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection} import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection @@ -395,7 +396,12 @@ class RequestQuotaTest extends BaseRequestTest { new AddPartitionsToTxnRequest.Builder("test-transactional-id", 1, 0, List(tp).asJava) case ApiKeys.ADD_OFFSETS_TO_TXN => - new AddOffsetsToTxnRequest.Builder("test-transactional-id", 1, 0, "test-txn-group") + new AddOffsetsToTxnRequest.Builder(new AddOffsetsToTxnRequestData() + .setTransactionalId("test-transactional-id") + .setProducerId(1) + .setProducerEpoch(0) + .setGroupId("test-txn-group") + ) case ApiKeys.END_TXN => new EndTxnRequest.Builder(new EndTxnRequestData()