This is an automated email from the ASF dual-hosted git repository. manikumar pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 926fb35 KAFKA-8599: Use automatic RPC generation in ExpireDelegationToken 926fb35 is described below commit 926fb35d9dcefd45c1e1d276ee7252b15875f23e Author: Mickael Maison <mickael.mai...@gmail.com> AuthorDate: Wed Aug 7 13:32:26 2019 +0530 KAFKA-8599: Use automatic RPC generation in ExpireDelegationToken Author: Mickael Maison <mickael.mai...@gmail.com> Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>, Viktor Somogyi <viktorsomo...@gmail.com> Closes #7098 from mimaison/KAFKA-8599 --- .../kafka/clients/admin/KafkaAdminClient.java | 8 ++- .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../apache/kafka/common/protocol/types/Struct.java | 1 + .../kafka/common/requests/AbstractResponse.java | 2 +- .../requests/ExpireDelegationTokenRequest.java | 77 +++++++--------------- .../requests/ExpireDelegationTokenResponse.java | 70 +++++--------------- .../kafka/common/requests/RequestResponseTest.java | 13 +++- core/src/main/scala/kafka/server/KafkaApis.scala | 7 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 8 ++- 9 files changed, 73 insertions(+), 119 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 8092eec..5256e36 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -73,6 +73,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicRe import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfigCollection; @@ -2473,8 +2474,11 @@ public class KafkaAdminClient extends AdminClient { new LeastLoadedNodeProvider()) { @Override - AbstractRequest.Builder createRequest(int timeoutMs) { - return new ExpireDelegationTokenRequest.Builder(hmac, options.expiryTimePeriodMs()); + AbstractRequest.Builder<ExpireDelegationTokenRequest> createRequest(int timeoutMs) { + return new ExpireDelegationTokenRequest.Builder( + new ExpireDelegationTokenRequestData() + .setHmac(hmac) + .setExpiryTimePeriodMs(options.expiryTimePeriodMs())); } @Override diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 3f0f5e8..e05f692 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -30,6 +30,8 @@ import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.ElectLeadersRequestData; import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.FindCoordinatorResponseData; import org.apache.kafka.common.message.HeartbeatRequestData; @@ -93,8 +95,6 @@ import org.apache.kafka.common.requests.DescribeLogDirsRequest; import org.apache.kafka.common.requests.DescribeLogDirsResponse; import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.EndTxnResponse; -import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; -import org.apache.kafka.common.requests.ExpireDelegationTokenResponse; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.common.requests.LeaderAndIsrRequest; @@ -191,7 +191,7 @@ public enum ApiKeys { CreatePartitionsResponse.schemaVersions()), CREATE_DELEGATION_TOKEN(38, "CreateDelegationToken", CreateDelegationTokenRequestData.SCHEMAS, CreateDelegationTokenResponseData.SCHEMAS), RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()), - EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()), + EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequestData.SCHEMAS, ExpireDelegationTokenResponseData.SCHEMAS), DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()), DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequestData.SCHEMAS, DeleteGroupsResponseData.SCHEMAS), ELECT_LEADERS(43, "ElectLeaders", ElectLeadersRequestData.SCHEMAS, diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java index 3114aea..e47a2cd 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java @@ -290,6 +290,7 @@ public class Struct { ByteBuffer buf = (ByteBuffer) result; byte[] arr = new byte[buf.remaining()]; buf.get(arr); + buf.flip(); return arr; } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index eb52fb8..da2b837 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -151,7 +151,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { case RENEW_DELEGATION_TOKEN: return new RenewDelegationTokenResponse(struct); case EXPIRE_DELEGATION_TOKEN: - return new ExpireDelegationTokenResponse(struct); + return new ExpireDelegationTokenResponse(struct, version); case DESCRIBE_DELEGATION_TOKEN: return new DescribeDelegationTokenResponse(struct); case DELETE_GROUPS: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java index 5b99676..ca6d2d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java @@ -16,102 +16,69 @@ */ package org.apache.kafka.common.requests; +import java.nio.ByteBuffer; + +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import java.nio.ByteBuffer; - -import static org.apache.kafka.common.protocol.types.Type.BYTES; -import static org.apache.kafka.common.protocol.types.Type.INT64; - public class ExpireDelegationTokenRequest extends AbstractRequest { - private static final String HMAC_KEY_NAME = "hmac"; - private static final String EXPIRY_TIME_PERIOD_KEY_NAME = "expiry_time_period"; - private final ByteBuffer hmac; - private final long expiryTimePeriod; - - private static final Schema TOKEN_EXPIRE_REQUEST_V0 = new Schema( - new Field(HMAC_KEY_NAME, BYTES, "HMAC of the delegation token to be expired."), - new Field(EXPIRY_TIME_PERIOD_KEY_NAME, INT64, "expiry time period in milli seconds.")); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema TOKEN_EXPIRE_REQUEST_V1 = TOKEN_EXPIRE_REQUEST_V0; + private final ExpireDelegationTokenRequestData data; - private ExpireDelegationTokenRequest(short version, ByteBuffer hmac, long renewTimePeriod) { + private ExpireDelegationTokenRequest(ExpireDelegationTokenRequestData data, short version) { super(ApiKeys.EXPIRE_DELEGATION_TOKEN, version); - - this.hmac = hmac; - this.expiryTimePeriod = renewTimePeriod; + this.data = data; } - public ExpireDelegationTokenRequest(Struct struct, short versionId) { - super(ApiKeys.EXPIRE_DELEGATION_TOKEN, versionId); - - hmac = struct.getBytes(HMAC_KEY_NAME); - expiryTimePeriod = struct.getLong(EXPIRY_TIME_PERIOD_KEY_NAME); + public ExpireDelegationTokenRequest(Struct struct, short version) { + super(ApiKeys.EXPIRE_DELEGATION_TOKEN, version); + this.data = new ExpireDelegationTokenRequestData(struct, version); } public static ExpireDelegationTokenRequest parse(ByteBuffer buffer, short version) { return new ExpireDelegationTokenRequest(ApiKeys.EXPIRE_DELEGATION_TOKEN.parseRequest(version, buffer), version); } - public static Schema[] schemaVersions() { - return new Schema[] {TOKEN_EXPIRE_REQUEST_V0, TOKEN_EXPIRE_REQUEST_V1}; - } - @Override protected Struct toStruct() { - short version = version(); - Struct struct = new Struct(ApiKeys.EXPIRE_DELEGATION_TOKEN.requestSchema(version)); - - struct.set(HMAC_KEY_NAME, hmac); - struct.set(EXPIRY_TIME_PERIOD_KEY_NAME, expiryTimePeriod); - - return struct; + return data.toStruct(version()); } @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - return new ExpireDelegationTokenResponse(throttleTimeMs, Errors.forException(e)); + return new ExpireDelegationTokenResponse( + new ExpireDelegationTokenResponseData() + .setErrorCode(Errors.forException(e).code()) + .setThrottleTimeMs(throttleTimeMs)); } public ByteBuffer hmac() { - return hmac; + return ByteBuffer.wrap(data.hmac()); } public long expiryTimePeriod() { - return expiryTimePeriod; + return data.expiryTimePeriodMs(); } public static class Builder extends AbstractRequest.Builder<ExpireDelegationTokenRequest> { - private final ByteBuffer hmac; - private final long expiryTimePeriod; + private final ExpireDelegationTokenRequestData data; - public Builder(byte[] hmac, long expiryTimePeriod) { + public Builder(ExpireDelegationTokenRequestData data) { super(ApiKeys.EXPIRE_DELEGATION_TOKEN); - this.hmac = ByteBuffer.wrap(hmac); - this.expiryTimePeriod = expiryTimePeriod; + this.data = data; } @Override public ExpireDelegationTokenRequest build(short version) { - return new ExpireDelegationTokenRequest(version, hmac, expiryTimePeriod); + return new ExpireDelegationTokenRequest(data, version); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type: ExpireDelegationTokenRequest"). - append(", hmac=").append(hmac). - append(", expiryTimePeriod=").append(expiryTimePeriod). - append(")"); - return bld.toString(); + return data.toString(); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java index 9491a35..16a6e8c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenResponse.java @@ -16,92 +16,56 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; - import java.nio.ByteBuffer; +import java.util.Collections; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.types.Type.INT64; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; public class ExpireDelegationTokenResponse extends AbstractResponse { - private static final String EXPIRY_TIMESTAMP_KEY_NAME = "expiry_timestamp"; - - private final Errors error; - private final long expiryTimestamp; - private final int throttleTimeMs; + private final ExpireDelegationTokenResponseData data; - private static final Schema TOKEN_EXPIRE_RESPONSE_V0 = new Schema( - ERROR_CODE, - new Field(EXPIRY_TIMESTAMP_KEY_NAME, INT64, "timestamp (in msec) at which this token expires.."), - THROTTLE_TIME_MS); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema TOKEN_EXPIRE_RESPONSE_V1 = TOKEN_EXPIRE_RESPONSE_V0; - - public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error, long expiryTimestamp) { - this.throttleTimeMs = throttleTimeMs; - this.error = error; - this.expiryTimestamp = expiryTimestamp; - } - - public ExpireDelegationTokenResponse(int throttleTimeMs, Errors error) { - this(throttleTimeMs, error, -1); + public ExpireDelegationTokenResponse(ExpireDelegationTokenResponseData data) { + this.data = data; } - public ExpireDelegationTokenResponse(Struct struct) { - error = Errors.forCode(struct.get(ERROR_CODE)); - this.expiryTimestamp = struct.getLong(EXPIRY_TIMESTAMP_KEY_NAME); - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); + public ExpireDelegationTokenResponse(Struct struct, short version) { + this.data = new ExpireDelegationTokenResponseData(struct, version); } public static ExpireDelegationTokenResponse parse(ByteBuffer buffer, short version) { - return new ExpireDelegationTokenResponse(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version).read(buffer)); - } - - public static Schema[] schemaVersions() { - return new Schema[] {TOKEN_EXPIRE_RESPONSE_V0, TOKEN_EXPIRE_RESPONSE_V1}; + return new ExpireDelegationTokenResponse(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version).read(buffer), version); } public Errors error() { - return error; + return Errors.forCode(data.errorCode()); } public long expiryTimestamp() { - return expiryTimestamp; + return data.expiryTimestampMs(); } @Override public Map<Errors, Integer> errorCounts() { - return errorCounts(error); + return Collections.singletonMap(error(), 1); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.EXPIRE_DELEGATION_TOKEN.responseSchema(version)); - - struct.set(ERROR_CODE, error.code()); - struct.set(EXPIRY_TIMESTAMP_KEY_NAME, expiryTimestamp); - struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); - - return struct; + return data.toStruct(version); } @Override public int throttleTimeMs() { - return throttleTimeMs; + return data.throttleTimeMs(); } public boolean hasError() { - return this.error != Errors.NONE; + return error() != Errors.NONE; } @Override diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 4218eff..0b8d98d 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -58,6 +58,8 @@ import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult; import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData; +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData; import org.apache.kafka.common.message.FindCoordinatorRequestData; import org.apache.kafka.common.message.HeartbeatRequestData; import org.apache.kafka.common.message.HeartbeatResponseData; @@ -1551,11 +1553,18 @@ public class RequestResponseTest { } private ExpireDelegationTokenRequest createExpireTokenRequest() { - return new ExpireDelegationTokenRequest.Builder("test".getBytes(), System.currentTimeMillis()).build(); + ExpireDelegationTokenRequestData data = new ExpireDelegationTokenRequestData() + .setHmac("test".getBytes()) + .setExpiryTimePeriodMs(System.currentTimeMillis()); + return new ExpireDelegationTokenRequest.Builder(data).build(); } private ExpireDelegationTokenResponse createExpireTokenResponse() { - return new ExpireDelegationTokenResponse(20, Errors.NONE, System.currentTimeMillis()); + ExpireDelegationTokenResponseData data = new ExpireDelegationTokenResponseData() + .setThrottleTimeMs(20) + .setErrorCode(Errors.NONE.code()) + .setExpiryTimestampMs(System.currentTimeMillis()); + return new ExpireDelegationTokenResponse(data); } private DescribeDelegationTokenRequest createDescribeTokenRequest() { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a88cd92..2b49982 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -59,6 +59,7 @@ import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicR import org.apache.kafka.common.message.DescribeGroupsResponseData import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult +import org.apache.kafka.common.message.ExpireDelegationTokenResponseData import org.apache.kafka.common.message.FindCoordinatorResponseData import org.apache.kafka.common.message.HeartbeatResponseData import org.apache.kafka.common.message.InitProducerIdResponseData @@ -2484,7 +2485,11 @@ class KafkaApis(val requestChannel: RequestChannel, trace("Sending expire token response for correlation id %d to client %s." .format(request.header.correlationId, request.header.clientId)) sendResponseMaybeThrottle(request, requestThrottleMs => - new ExpireDelegationTokenResponse(requestThrottleMs, error, expiryTimestamp)) + new ExpireDelegationTokenResponse( + new ExpireDelegationTokenResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setErrorCode(error.code) + .setExpiryTimestampMs(expiryTimestamp))) } if (!allowTokenRequests(request)) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index a8d29fd..db7ab69 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -33,6 +33,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, import org.apache.kafka.common.message.DeleteGroupsRequestData import org.apache.kafka.common.message.DeleteTopicsRequestData import org.apache.kafka.common.message.DescribeGroupsRequestData +import org.apache.kafka.common.message.ExpireDelegationTokenRequestData import org.apache.kafka.common.message.FindCoordinatorRequestData import org.apache.kafka.common.message.HeartbeatRequestData import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData @@ -444,7 +445,10 @@ class RequestQuotaTest extends BaseRequestTest { ) case ApiKeys.EXPIRE_DELEGATION_TOKEN => - new ExpireDelegationTokenRequest.Builder("".getBytes, 1000) + new ExpireDelegationTokenRequest.Builder( + new ExpireDelegationTokenRequestData() + .setHmac("".getBytes) + .setExpiryTimePeriodMs(1000L)) case ApiKeys.DESCRIBE_DELEGATION_TOKEN => new DescribeDelegationTokenRequest.Builder(Collections.singletonList(SecurityUtils.parseKafkaPrincipal("User:test"))) @@ -573,7 +577,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.CREATE_PARTITIONS => new CreatePartitionsResponse(response).throttleTimeMs case ApiKeys.CREATE_DELEGATION_TOKEN => new CreateDelegationTokenResponse(response, ApiKeys.CREATE_DELEGATION_TOKEN.latestVersion).throttleTimeMs case ApiKeys.DESCRIBE_DELEGATION_TOKEN=> new DescribeDelegationTokenResponse(response).throttleTimeMs - case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response).throttleTimeMs + case ApiKeys.EXPIRE_DELEGATION_TOKEN => new ExpireDelegationTokenResponse(response, ApiKeys.EXPIRE_DELEGATION_TOKEN.latestVersion).throttleTimeMs case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs