Repository: kafka Updated Branches: refs/heads/trunk 5916ef022 -> 015f1d738
http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 94de4b1..189575e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -23,7 +23,6 @@ import java.util.Map; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.CollectionUtils; @@ -138,12 +137,12 @@ public class OffsetFetchResponse extends AbstractResponse { } public static OffsetFetchResponse parse(ByteBuffer buffer, short version) { - return new OffsetFetchResponse(ProtoUtils.parseResponse(ApiKeys.OFFSET_FETCH.id, version, buffer)); + return new OffsetFetchResponse(ApiKeys.OFFSET_FETCH.parseResponse(version, buffer)); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, version)); + Struct struct = new Struct(ApiKeys.OFFSET_FETCH.responseSchema(version)); Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData); List<Struct> topicArray = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java index df70e20..7b454e8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java @@ -17,7 +17,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.utils.CollectionUtils; @@ -107,7 +106,7 @@ public class ProduceRequest extends AbstractRequest { */ @Override public Struct toStruct() { - Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.PRODUCE.id, version())); + Struct struct = new Struct(ApiKeys.PRODUCE.requestSchema(version())); Map<String, Map<Integer, MemoryRecords>> recordsByTopic = CollectionUtils.groupDataByTopic(partitionRecords); struct.set(ACKS_KEY_NAME, acks); struct.set(TIMEOUT_KEY_NAME, timeout); @@ -150,7 +149,7 @@ public class ProduceRequest extends AbstractRequest { return new ProduceResponse(responseMap); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.PRODUCE.id))); + versionId, this.getClass().getSimpleName(), ApiKeys.PRODUCE.latestVersion())); } } @@ -170,7 +169,7 @@ public class ProduceRequest extends AbstractRequest { partitionRecords.clear(); } - public static ProduceRequest parse(ByteBuffer buffer, short versionId) { - return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, versionId, buffer), versionId); + public static ProduceRequest parse(ByteBuffer buffer, short version) { + return new ProduceRequest(ApiKeys.PRODUCE.parseRequest(version, buffer), version); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 7a022af..8234732 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -15,7 +15,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.Record; import org.apache.kafka.common.utils.CollectionUtils; @@ -107,7 +106,7 @@ public class ProduceResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, version)); + Struct struct = new Struct(ApiKeys.PRODUCE.responseSchema(version)); Map<String, Map<Integer, PartitionResponse>> responseByTopic = CollectionUtils.groupDataByTopic(responses); List<Struct> topicDatas = new ArrayList<>(responseByTopic.size()); @@ -174,6 +173,6 @@ public class ProduceResponse extends AbstractResponse { } public static ProduceResponse parse(ByteBuffer buffer, short version) { - return new ProduceResponse(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, version).read(buffer)); + return new ProduceResponse(ApiKeys.PRODUCE.responseSchema(version).read(buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java index a1f3f0e..9cc3f1f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java @@ -24,7 +24,6 @@ import java.util.List; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; @@ -44,7 +43,7 @@ public class SaslHandshakeRequest extends AbstractRequest { private final String mechanism; public SaslHandshakeRequest(String mechanism) { - super(ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id)); + super(ApiKeys.SASL_HANDSHAKE.latestVersion()); this.mechanism = mechanism; } @@ -66,17 +65,17 @@ public class SaslHandshakeRequest extends AbstractRequest { return new SaslHandshakeResponse(Errors.forException(e), enabledMechanisms); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id))); + versionId, this.getClass().getSimpleName(), ApiKeys.SASL_HANDSHAKE.latestVersion())); } } - public static SaslHandshakeRequest parse(ByteBuffer buffer, short versionId) { - return new SaslHandshakeRequest(ProtoUtils.parseRequest(ApiKeys.SASL_HANDSHAKE.id, versionId, buffer), versionId); + public static SaslHandshakeRequest parse(ByteBuffer buffer, short version) { + return new SaslHandshakeRequest(ApiKeys.SASL_HANDSHAKE.parseRequest(version, buffer), version); } @Override protected Struct toStruct() { - Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.SASL_HANDSHAKE.id, version())); + Struct struct = new Struct(ApiKeys.SASL_HANDSHAKE.requestSchema(version())); struct.set(MECHANISM_KEY_NAME, mechanism); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java index 9d38c6a..e1a4c87 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java @@ -25,7 +25,6 @@ import java.util.List; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; @@ -66,7 +65,7 @@ public class SaslHandshakeResponse extends AbstractResponse { @Override public Struct toStruct(short version) { - Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.SASL_HANDSHAKE.id, version)); + Struct struct = new Struct(ApiKeys.SASL_HANDSHAKE.responseSchema(version)); struct.set(ERROR_CODE_KEY_NAME, error.code()); struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray()); return struct; @@ -77,7 +76,7 @@ public class SaslHandshakeResponse extends AbstractResponse { } public static SaslHandshakeResponse parse(ByteBuffer buffer, short version) { - return new SaslHandshakeResponse(ProtoUtils.parseResponse(ApiKeys.SASL_HANDSHAKE.id, version, buffer)); + return new SaslHandshakeResponse(ApiKeys.SASL_HANDSHAKE.parseResponse(version, buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java index 91806f1..7b79cd8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java @@ -16,7 +16,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.Utils; @@ -113,7 +112,7 @@ public class StopReplicaRequest extends AbstractRequest { return new StopReplicaResponse(Errors.NONE, responses); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.STOP_REPLICA.id))); + versionId, this.getClass().getSimpleName(), ApiKeys.STOP_REPLICA.latestVersion())); } } @@ -133,13 +132,13 @@ public class StopReplicaRequest extends AbstractRequest { return partitions; } - public static StopReplicaRequest parse(ByteBuffer buffer, short versionId) { - return new StopReplicaRequest(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, buffer), versionId); + public static StopReplicaRequest parse(ByteBuffer buffer, short version) { + return new StopReplicaRequest(ApiKeys.STOP_REPLICA.parseRequest(version, buffer), version); } @Override protected Struct toStruct() { - Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.STOP_REPLICA.id, version())); + Struct struct = new Struct(ApiKeys.STOP_REPLICA.requestSchema(version())); struct.set(CONTROLLER_ID_KEY_NAME, controllerId); struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java index 5ae5cc1..617b1c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java @@ -16,7 +16,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -69,13 +68,13 @@ public class StopReplicaResponse extends AbstractResponse { return error; } - public static StopReplicaResponse parse(ByteBuffer buffer, short versionId) { - return new StopReplicaResponse(ProtoUtils.parseResponse(ApiKeys.STOP_REPLICA.id, versionId, buffer)); + public static StopReplicaResponse parse(ByteBuffer buffer, short version) { + return new StopReplicaResponse(ApiKeys.STOP_REPLICA.parseResponse(version, buffer)); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.STOP_REPLICA.id, version)); + Struct struct = new Struct(ApiKeys.STOP_REPLICA.responseSchema(version)); List<Struct> responseDatas = new ArrayList<>(responses.size()); for (Map.Entry<TopicPartition, Errors> response : responses.entrySet()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java index 7ad5c9a..b55ccff 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java @@ -18,7 +18,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.Utils; @@ -108,7 +107,7 @@ public class SyncGroupRequest extends AbstractRequest { ByteBuffer.wrap(new byte[]{})); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.SYNC_GROUP.id))); + versionId, this.getClass().getSimpleName(), ApiKeys.SYNC_GROUP.latestVersion())); } } @@ -128,13 +127,13 @@ public class SyncGroupRequest extends AbstractRequest { return memberId; } - public static SyncGroupRequest parse(ByteBuffer buffer, short versionId) { - return new SyncGroupRequest(ProtoUtils.parseRequest(ApiKeys.SYNC_GROUP.id, versionId, buffer), versionId); + public static SyncGroupRequest parse(ByteBuffer buffer, short version) { + return new SyncGroupRequest(ApiKeys.SYNC_GROUP.parseRequest(version, buffer), version); } @Override protected Struct toStruct() { - Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.SYNC_GROUP.id, version())); + Struct struct = new Struct(ApiKeys.SYNC_GROUP.requestSchema(version())); struct.set(GROUP_ID_KEY_NAME, groupId); struct.set(GENERATION_ID_KEY_NAME, generationId); struct.set(MEMBER_ID_KEY_NAME, memberId); http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java index ff198aa..148815c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java @@ -18,7 +18,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -62,14 +61,14 @@ public class SyncGroupResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.SYNC_GROUP.id, version)); + Struct struct = new Struct(ApiKeys.SYNC_GROUP.responseSchema(version)); struct.set(ERROR_CODE_KEY_NAME, error.code()); struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState); return struct; } public static SyncGroupResponse parse(ByteBuffer buffer, short version) { - return new SyncGroupResponse(ProtoUtils.parseResponse(ApiKeys.SYNC_GROUP.id, version, buffer)); + return new SyncGroupResponse(ApiKeys.SYNC_GROUP.parseResponse(version, buffer)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java index 8dd852d..98bf83b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.utils.Utils; @@ -226,7 +225,7 @@ public class UpdateMetadataRequest extends AbstractRequest { @Override protected Struct toStruct() { short version = version(); - Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)); + Struct struct = new Struct(ApiKeys.UPDATE_METADATA_KEY.requestSchema(version)); struct.set(CONTROLLER_ID_KEY_NAME, controllerId); struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch); @@ -288,7 +287,7 @@ public class UpdateMetadataRequest extends AbstractRequest { return new UpdateMetadataResponse(Errors.forException(e)); else throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id))); + versionId, this.getClass().getSimpleName(), ApiKeys.UPDATE_METADATA_KEY.latestVersion())); } public int controllerId() { @@ -307,9 +306,8 @@ public class UpdateMetadataRequest extends AbstractRequest { return liveBrokers; } - public static UpdateMetadataRequest parse(ByteBuffer buffer, short versionId) { - return new UpdateMetadataRequest(ProtoUtils.parseRequest(ApiKeys.UPDATE_METADATA_KEY.id, versionId, buffer), - versionId); + public static UpdateMetadataRequest parse(ByteBuffer buffer, short version) { + return new UpdateMetadataRequest(ApiKeys.UPDATE_METADATA_KEY.parseRequest(version, buffer), version); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java index 0032fca..5b8b46d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java @@ -15,7 +15,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -44,12 +43,12 @@ public class UpdateMetadataResponse extends AbstractResponse { } public static UpdateMetadataResponse parse(ByteBuffer buffer, short version) { - return new UpdateMetadataResponse(ProtoUtils.parseResponse(ApiKeys.UPDATE_METADATA_KEY.id, version, buffer)); + return new UpdateMetadataResponse(ApiKeys.UPDATE_METADATA_KEY.parseResponse(version, buffer)); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ProtoUtils.responseSchema(ApiKeys.UPDATE_METADATA_KEY.id, version)); + Struct struct = new Struct(ApiKeys.UPDATE_METADATA_KEY.responseSchema(version)); struct.set(ERROR_CODE_KEY_NAME, error.code()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index c89cc24..eb6a259 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.requests.ApiVersionsResponse; @@ -134,7 +133,7 @@ public class NetworkClientTest { networkClient.poll(1, time.milliseconds()); assertEquals(1, networkClient.inFlightRequestCount()); ResponseHeader respHeader = new ResponseHeader(request.correlationId()); - Struct resp = new Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id)); + Struct resp = new Struct(ApiKeys.PRODUCE.responseSchema(ApiKeys.PRODUCE.latestVersion())); resp.set("responses", new Object[0]); Struct responseHeaderStruct = respHeader.toStruct(); int size = responseHeaderStruct.sizeOf() + resp.sizeOf(); http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java index 53c47c8..9f0868f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.clients; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion; import org.junit.Assert; @@ -64,8 +63,7 @@ public class NodeApiVersionsTest { } else if (apiKey == ApiKeys.DELETE_TOPICS) { versionList.add(new ApiVersion(apiKey.id, (short) 10000, (short) 10001)); } else { - versionList.add(new ApiVersion(apiKey.id, - ProtoUtils.oldestVersion(apiKey.id), ProtoUtils.latestVersion(apiKey.id))); + versionList.add(new ApiVersion(apiKey)); } } NodeApiVersions versions = new NodeApiVersions(versionList); @@ -80,15 +78,15 @@ public class NodeApiVersionsTest { } else { bld.append(apiKey.name).append("("). append(apiKey.id).append("): "); - if (ProtoUtils.oldestVersion(apiKey.id) == - ProtoUtils.latestVersion(apiKey.id)) { - bld.append(ProtoUtils.oldestVersion(apiKey.id)); + if (apiKey.oldestVersion() == + apiKey.latestVersion()) { + bld.append(apiKey.oldestVersion()); } else { - bld.append(ProtoUtils.oldestVersion(apiKey.id)). + bld.append(apiKey.oldestVersion()). append(" to "). - append(ProtoUtils.latestVersion(apiKey.id)); + append(apiKey.latestVersion()); } - bld.append(" [usable: ").append(ProtoUtils.latestVersion(apiKey.id)). + bld.append(" [usable: ").append(apiKey.latestVersion()). append("]"); } prefix = ", "; @@ -129,7 +127,7 @@ public class NodeApiVersionsTest { versionList.add(new ApiVersion((short) 100, (short) 0, (short) 1)); NodeApiVersions versions = new NodeApiVersions(versionList); for (ApiKeys apiKey: ApiKeys.values()) { - assertEquals(ProtoUtils.latestVersion(apiKey.id), versions.usableVersion(apiKey)); + assertEquals(apiKey.latestVersion(), versions.usableVersion(apiKey)); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java index f177ae6..8917a22 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java @@ -31,4 +31,9 @@ public class ApiKeysTest { ApiKeys.forId(10000); } + @Test(expected = IllegalArgumentException.class) + public void schemaVersionOutOfRange() { + ApiKeys.PRODUCE.requestSchema((short) Protocol.REQUESTS[ApiKeys.PRODUCE.id].length); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java deleted file mode 100644 index 440ca49..0000000 --- a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.protocol; - -import org.junit.Test; - -public class ProtoUtilsTest { - @Test(expected = IllegalArgumentException.class) - public void schemaVersionOutOfRange() { - ProtoUtils.requestSchema(ApiKeys.PRODUCE.id, Protocol.REQUESTS[ApiKeys.PRODUCE.id].length); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index d0b9639..1367ba1 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.MemoryRecords; @@ -169,7 +168,7 @@ public class RequestResponseTest { } private void checkOlderFetchVersions() throws Exception { - int latestVersion = ProtoUtils.latestVersion(ApiKeys.FETCH.id); + int latestVersion = ApiKeys.FETCH.latestVersion(); for (int i = 0; i < latestVersion; ++i) { checkErrorResponse(createFetchRequest(i), new UnknownServerException()); checkRequest(createFetchRequest(i)); @@ -220,11 +219,11 @@ public class RequestResponseTest { assertEquals("Throttle time must be zero", 0, v0Response.getThrottleTime()); assertEquals("Throttle time must be 10", 10, v1Response.getThrottleTime()); assertEquals("Throttle time must be 10", 10, v2Response.getThrottleTime()); - assertEquals("Should use schema version 0", ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 0), + assertEquals("Should use schema version 0", ApiKeys.PRODUCE.responseSchema((short) 0), v0Response.toStruct((short) 0).schema()); - assertEquals("Should use schema version 1", ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 1), + assertEquals("Should use schema version 1", ApiKeys.PRODUCE.responseSchema((short) 1), v1Response.toStruct((short) 1).schema()); - assertEquals("Should use schema version 2", ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 2), + assertEquals("Should use schema version 2", ApiKeys.PRODUCE.responseSchema((short) 2), v2Response.toStruct((short) 2).schema()); assertEquals("Response data does not match", responseData, v0Response.responses()); assertEquals("Response data does not match", responseData, v1Response.responses()); @@ -242,9 +241,9 @@ public class RequestResponseTest { FetchResponse v1Response = new FetchResponse(responseData, 10); assertEquals("Throttle time must be zero", 0, v0Response.throttleTimeMs()); assertEquals("Throttle time must be 10", 10, v1Response.throttleTimeMs()); - assertEquals("Should use schema version 0", ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0), + assertEquals("Should use schema version 0", ApiKeys.FETCH.responseSchema((short) 0), v0Response.toStruct((short) 0).schema()); - assertEquals("Should use schema version 1", ProtoUtils.responseSchema(ApiKeys.FETCH.id, 1), + assertEquals("Should use schema version 1", ApiKeys.FETCH.responseSchema((short) 1), v1Response.toStruct((short) 1).schema()); assertEquals("Response data does not match", responseData, v0Response.responseData()); assertEquals("Response data does not match", responseData, v1Response.responseData()); @@ -253,7 +252,7 @@ public class RequestResponseTest { @Test public void verifyFetchResponseFullWrite() throws Exception { FetchResponse fetchResponse = createFetchResponse(); - RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, ProtoUtils.latestVersion(ApiKeys.FETCH.id), + RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, ApiKeys.FETCH.latestVersion(), "client", 15); Send send = fetchResponse.toSend("1", header); @@ -272,7 +271,7 @@ public class RequestResponseTest { assertEquals(header.correlationId(), responseHeader.correlationId()); // read the body - Struct responseBody = ProtoUtils.responseSchema(ApiKeys.FETCH.id, header.apiVersion()).read(buf); + Struct responseBody = ApiKeys.FETCH.responseSchema(header.apiVersion()).read(buf); assertEquals(fetchResponse.toStruct(header.apiVersion()), responseBody); assertEquals(size, responseHeader.sizeOf() + responseBody.sizeOf()); @@ -281,7 +280,7 @@ public class RequestResponseTest { @Test public void testControlledShutdownResponse() { ControlledShutdownResponse response = createControlledShutdownResponse(); - short version = ProtoUtils.latestVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id); + short version = ApiKeys.CONTROLLED_SHUTDOWN_KEY.latestVersion(); Struct struct = response.toStruct(version); ByteBuffer buffer = toBuffer(struct); ControlledShutdownResponse deserialized = ControlledShutdownResponse.parse(buffer, version); @@ -403,11 +402,11 @@ public class RequestResponseTest { Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap( new TopicPartition("test", 0), new ListOffsetRequest.PartitionData(1000000L, 10)); - return ListOffsetRequest.Builder.forConsumer((short) 0).setOffsetData(offsetData).build((short) version); + return ListOffsetRequest.Builder.forConsumer(false).setOffsetData(offsetData).build((short) version); } else if (version == 1) { Map<TopicPartition, Long> offsetData = Collections.singletonMap( new TopicPartition("test", 0), 1000000L); - return ListOffsetRequest.Builder.forConsumer((short) 1).setTargetTimes(offsetData).build((short) version); + return ListOffsetRequest.Builder.forConsumer(true).setTargetTimes(offsetData).build((short) version); } else { throw new IllegalArgumentException("Illegal ListOffsetRequest version " + version); } http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java index 3a9e0ce..8a93532 100644 --- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.ProtoUtils; import org.apache.kafka.common.protocol.SecurityProtocol; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; @@ -758,8 +757,8 @@ public class SaslAuthenticatorTest { // Send ApiVersionsRequest and check response ApiVersionsResponse versionsResponse = sendVersionRequestReceiveResponse(node); - assertEquals(ProtoUtils.oldestVersion(ApiKeys.SASL_HANDSHAKE.id), versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion); - assertEquals(ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id), versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion); + assertEquals(ApiKeys.SASL_HANDSHAKE.oldestVersion(), versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion); + assertEquals(ApiKeys.SASL_HANDSHAKE.latestVersion(), versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion); // Send SaslHandshakeRequest and check response SaslHandshakeResponse handshakeResponse = sendHandshakeRequestReceiveResponse(node); http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 4f71258..3368c09 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -27,7 +27,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.errors._ -import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils, SecurityProtocol} +import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests._ import CreateTopicsRequest.TopicDetails import org.apache.kafka.common.security.auth.KafkaPrincipal @@ -191,12 +191,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def createFetchRequest = { val partitionMap = new util.LinkedHashMap[TopicPartition, requests.FetchRequest.PartitionData] partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 100)) - val version = ProtoUtils.latestVersion(ApiKeys.FETCH.id) + val version = ApiKeys.FETCH.latestVersion requests.FetchRequest.Builder.forReplica(version, 5000, 100, Int.MaxValue, partitionMap).build() } private def createListOffsetsRequest = { - requests.ListOffsetRequest.Builder.forConsumer(0).setTargetTimes( + requests.ListOffsetRequest.Builder.forConsumer(false).setTargetTimes( Map(tp -> (0L: java.lang.Long)).asJava). build() } @@ -215,7 +215,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId, Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, securityProtocol, ListenerName.forSecurityProtocol(securityProtocol))).asJava, null)).asJava - val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion new requests.UpdateMetadataRequest.Builder(version, brokerId, Int.MaxValue, partitionState, brokers).build() } @@ -773,7 +773,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { @Test def testUnauthorizedDeleteWithoutDescribe() { val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS) - val version = ProtoUtils.latestVersion(ApiKeys.DELETE_TOPICS.id) + val version = ApiKeys.DELETE_TOPICS.latestVersion val deleteResponse = DeleteTopicsResponse.parse(response, version) assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, deleteResponse.errors.asScala.head._2) } @@ -782,7 +782,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { def testUnauthorizedDeleteWithDescribe() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)), deleteTopicResource) val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS) - val version = ProtoUtils.latestVersion(ApiKeys.DELETE_TOPICS.id) + val version = ApiKeys.DELETE_TOPICS.latestVersion val deleteResponse = DeleteTopicsResponse.parse(response, version) assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteResponse.errors.asScala.head._2) @@ -792,7 +792,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { def testDeleteWithWildCardAuth() { addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Delete)), new Resource(Topic, "*")) val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS) - val version = ProtoUtils.latestVersion(ApiKeys.DELETE_TOPICS.id) + val version = ApiKeys.DELETE_TOPICS.latestVersion val deleteResponse = DeleteTopicsResponse.parse(response, version) assertEquals(Errors.NONE, deleteResponse.errors.asScala.head._2) http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala index 448fce1..b82ddf9 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala @@ -22,7 +22,7 @@ import java.util.Properties import kafka.network.SocketServer import kafka.utils.TestUtils import org.apache.kafka.common.protocol.types.Struct -import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{CreateTopicsRequest, CreateTopicsResponse, MetadataRequest, MetadataResponse} import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue} @@ -147,9 +147,8 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { } protected def sendMetadataRequest(request: MetadataRequest, destination: SocketServer = anySocketServer): MetadataResponse = { - val version = ProtoUtils.latestVersion(ApiKeys.METADATA.id) val response = connectAndSend(request, ApiKeys.METADATA, destination = destination) - MetadataResponse.parse(response, version) + MetadataResponse.parse(response, ApiKeys.METADATA.latestVersion) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala index 981ed6a..2555a91 100644 --- a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala @@ -18,7 +18,7 @@ package kafka.server import org.apache.kafka.common.requests.ApiVersionsResponse -import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils, Protocol} +import org.apache.kafka.common.protocol.{ApiKeys, Protocol} import org.junit.Assert._ import org.junit.Test @@ -32,8 +32,8 @@ class ApiVersionsTest { for (key <- ApiKeys.values) { val version = ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersion(key.id) assertNotNull(s"Could not find ApiVersion for API ${key.name}", version) - assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, ProtoUtils.oldestVersion(key.id)) - assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, ProtoUtils.latestVersion(key.id)) + assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, key.oldestVersion) + assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, key.latestVersion) // Check if versions less than min version are indeed set as null, i.e., deprecated. for (i <- 0 until version.minVersion) { http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 64be5b3..5616956 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -24,7 +24,7 @@ import kafka.utils.TestUtils import kafka.utils.TestUtils._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils} +import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.LogEntry import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} import org.apache.kafka.common.serialization.StringSerializer @@ -70,7 +70,7 @@ class FetchRequestTest extends BaseRequestTest { private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse = { val response = connectAndSend(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId)) - FetchResponse.parse(response, ProtoUtils.latestVersion(ApiKeys.FETCH.id)) + FetchResponse.parse(response, ApiKeys.FETCH.latestVersion) } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 1b5007d..57c1846 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -22,7 +22,7 @@ import util.Arrays.asList import kafka.common.BrokerEndPointNotAvailableException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils, SecurityProtocol} +import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests.{PartitionState, UpdateMetadataRequest} import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint} import org.junit.Test @@ -69,7 +69,7 @@ class MetadataCacheTest { new TopicPartition(topic, 1) -> new PartitionState(controllerEpoch, 1, 1, asList(1), zkVersion, asSet(1)), new TopicPartition(topic, 2) -> new PartitionState(controllerEpoch, 2, 2, asList(2), zkVersion, asSet(2))) - val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) @@ -121,7 +121,7 @@ class MetadataCacheTest { val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asSet(0))) - val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) @@ -165,7 +165,7 @@ class MetadataCacheTest { val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas)) - val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) @@ -225,7 +225,7 @@ class MetadataCacheTest { val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas)) - val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) @@ -277,7 +277,7 @@ class MetadataCacheTest { val isr = asList[Integer](0, 1) val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas)) - val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) @@ -310,7 +310,7 @@ class MetadataCacheTest { val isr = asList[Integer](0, 1) val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas)) - val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest)