Repository: kafka Updated Branches: refs/heads/0.11.0 1b15adde1 -> 012332042
KAFKA-5128; Check inter broker version in transactional methods Add check in `KafkaApis` that the inter broker protocol version is at least `KAFKA_0_11_0_IV0`, i.e., supporting transactions Author: Damian Guy <damian....@gmail.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #3103 from dguy/kafka-5128 (cherry picked from commit 7892b4e6c7c32be09d78a8bbbeeaa823d3197aaa) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/01233204 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/01233204 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/01233204 Branch: refs/heads/0.11.0 Commit: 0123320426341fe50d4124f0ef398d7f5aaee909 Parents: 1b15add Author: Damian Guy <damian....@gmail.com> Authored: Fri May 26 09:52:47 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Fri May 26 09:54:21 2017 -0700 ---------------------------------------------------------------------- .../apache/kafka/common/protocol/ApiKeys.java | 71 +++++++----- .../common/requests/ApiVersionsResponse.java | 18 +-- .../clients/consumer/internals/FetcherTest.java | 2 +- .../clients/producer/internals/SenderTest.java | 2 +- .../requests/ApiVersionsResponseTest.java | 68 ++++++++++++ .../transaction/TransactionStateManager.scala | 1 - .../src/main/scala/kafka/server/KafkaApis.scala | 14 ++- .../scala/unit/kafka/server/KafkaApisTest.scala | 111 +++++++++++++++++++ 8 files changed, 246 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 36f6403..721a610 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -19,6 +19,7 @@ package org.apache.kafka.common.protocol; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.RecordBatch; import java.nio.ByteBuffer; @@ -26,25 +27,25 @@ import java.nio.ByteBuffer; * Identifiers for all the Kafka APIs */ public enum ApiKeys { - PRODUCE(0, "Produce", false), - FETCH(1, "Fetch", false), - LIST_OFFSETS(2, "Offsets", false), - METADATA(3, "Metadata", false), + PRODUCE(0, "Produce"), + FETCH(1, "Fetch"), + LIST_OFFSETS(2, "Offsets"), + METADATA(3, "Metadata"), LEADER_AND_ISR(4, "LeaderAndIsr", true), STOP_REPLICA(5, "StopReplica", true), UPDATE_METADATA_KEY(6, "UpdateMetadata", true), CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown", true), - OFFSET_COMMIT(8, "OffsetCommit", false), - OFFSET_FETCH(9, "OffsetFetch", false), - FIND_COORDINATOR(10, "FindCoordinator", false), - JOIN_GROUP(11, "JoinGroup", false), - HEARTBEAT(12, "Heartbeat", false), - LEAVE_GROUP(13, "LeaveGroup", false), - SYNC_GROUP(14, "SyncGroup", false), - DESCRIBE_GROUPS(15, "DescribeGroups", false), - LIST_GROUPS(16, "ListGroups", false), - SASL_HANDSHAKE(17, "SaslHandshake", false), - API_VERSIONS(18, "ApiVersions", false) { + OFFSET_COMMIT(8, "OffsetCommit"), + OFFSET_FETCH(9, "OffsetFetch"), + FIND_COORDINATOR(10, "FindCoordinator"), + JOIN_GROUP(11, "JoinGroup"), + HEARTBEAT(12, "Heartbeat"), + LEAVE_GROUP(13, "LeaveGroup"), + SYNC_GROUP(14, "SyncGroup"), + DESCRIBE_GROUPS(15, "DescribeGroups"), + LIST_GROUPS(16, "ListGroups"), + SASL_HANDSHAKE(17, "SaslHandshake"), + API_VERSIONS(18, "ApiVersions") { @Override public Struct parseResponse(short version, ByteBuffer buffer) { // Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest @@ -53,21 +54,21 @@ public enum ApiKeys { return parseResponse(version, buffer, (short) 0); } }, - CREATE_TOPICS(19, "CreateTopics", false), - DELETE_TOPICS(20, "DeleteTopics", false), - DELETE_RECORDS(21, "DeleteRecords", false), - INIT_PRODUCER_ID(22, "InitProducerId", false), + CREATE_TOPICS(19, "CreateTopics"), + DELETE_TOPICS(20, "DeleteTopics"), + DELETE_RECORDS(21, "DeleteRecords"), + INIT_PRODUCER_ID(22, "InitProducerId"), OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", true), - ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false), - ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false), - END_TXN(26, "EndTxn", false), - WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true), - TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false), - DESCRIBE_ACLS(29, "DescribeAcls", false), - CREATE_ACLS(30, "CreateAcls", false), - DELETE_ACLS(31, "DeleteAcls", false), - DESCRIBE_CONFIGS(32, "DescribeConfigs", false), - ALTER_CONFIGS(33, "AlterConfigs", false); + ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2), + ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2), + END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2), + WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2), + TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2), + DESCRIBE_ACLS(29, "DescribeAcls"), + CREATE_ACLS(30, "CreateAcls"), + DELETE_ACLS(31, "DeleteAcls"), + DESCRIBE_CONFIGS(32, "DescribeConfigs"), + ALTER_CONFIGS(33, "AlterConfigs"); private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; @@ -93,12 +94,24 @@ public enum ApiKeys { /** indicates if this is a ClusterAction request used only by brokers */ public final boolean clusterAction; + /** indicates the minimum required inter broker magic required to support the API */ + public final byte minRequiredInterBrokerMagic; + + ApiKeys(int id, String name) { + this(id, name, false); + } + ApiKeys(int id, String name, boolean clusterAction) { + this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0); + } + + ApiKeys(int id, String name, boolean clusterAction, byte minRequiredInterBrokerMagic) { if (id < 0) throw new IllegalArgumentException("id must not be negative, id: " + id); this.id = (short) id; this.name = name; this.clusterAction = clusterAction; + this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic; } public static ApiKeys forId(int id) { http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 6f921a7..e9d5023 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.record.RecordBatch; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -31,7 +32,8 @@ import java.util.Map; public class ApiVersionsResponse extends AbstractResponse { - public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME); + public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE); + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; public static final String ERROR_CODE_KEY_NAME = "error_code"; public static final String API_VERSIONS_KEY_NAME = "api_versions"; public static final String API_KEY_NAME = "api_key"; @@ -114,11 +116,11 @@ public class ApiVersionsResponse extends AbstractResponse { return struct; } - public static ApiVersionsResponse apiVersionsResponse(short version, int throttleTimeMs) { - if (throttleTimeMs == 0 || version == 0) + public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, byte maxMagic) { + if (maxMagic == RecordBatch.CURRENT_MAGIC_VALUE && throttleTimeMs == DEFAULT_THROTTLE_TIME) { return API_VERSIONS_RESPONSE; - else - return createApiVersionsResponse(throttleTimeMs); + } + return createApiVersionsResponse(throttleTimeMs, maxMagic); } /** @@ -150,10 +152,12 @@ public class ApiVersionsResponse extends AbstractResponse { return new ApiVersionsResponse(ApiKeys.API_VERSIONS.parseResponse(version, buffer)); } - public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs) { + public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs, final byte minMagic) { List<ApiVersion> versionList = new ArrayList<>(); for (ApiKeys apiKey : ApiKeys.values()) { - versionList.add(new ApiVersion(apiKey)); + if (apiKey.minRequiredInterBrokerMagic <= minMagic) { + versionList.add(new ApiVersion(apiKey)); + } } return new ApiVersionsResponse(throttleTimeMs, Errors.NONE, versionList); } http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index a81dc58..ba5b7d5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1051,7 +1051,7 @@ public class FetcherTest { time, true, new ApiVersions(), throttleTimeSensor); short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion(); - ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400).serialize(apiVersionsResponseVersion, new ResponseHeader(0)); + ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(apiVersionsResponseVersion, new ResponseHeader(0)); selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer))); while (!client.ready(node, time.milliseconds())) client.poll(1, time.milliseconds()); http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 50c4cd4..c08ea57 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -229,7 +229,7 @@ public class SenderTest { time, true, new ApiVersions(), throttleTimeSensor); short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion(); - ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400).serialize(apiVersionsResponseVersion, new ResponseHeader(0)); + ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400, RecordBatch.CURRENT_MAGIC_VALUE).serialize(apiVersionsResponseVersion, new ResponseHeader(0)); selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer))); while (!client.ready(node, time.milliseconds())) client.poll(1, time.milliseconds()); http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java new file mode 100644 index 0000000..1e8e3b4 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.Utils; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class ApiVersionsResponseTest { + + @Test + public void shouldCreateApiResponseOnlyWithKeysSupportedByMagicValue() throws Exception { + final ApiVersionsResponse response = ApiVersionsResponse.apiVersionsResponse(10, RecordBatch.MAGIC_VALUE_V1); + verifyApiKeysForMagic(response, RecordBatch.MAGIC_VALUE_V1); + assertEquals(10, response.throttleTimeMs()); + } + + @Test + public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() throws Exception { + assertEquals(apiKeysInResponse(ApiVersionsResponse.API_VERSIONS_RESPONSE), Utils.mkSet(ApiKeys.values())); + } + + @Test + public void shouldReturnAllKeysWhenMagicIsCurrentValueAndThrottleMsIsDefaultThrottle() throws Exception { + ApiVersionsResponse response = ApiVersionsResponse.apiVersionsResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, RecordBatch.CURRENT_MAGIC_VALUE); + assertEquals(Utils.mkSet(ApiKeys.values()), apiKeysInResponse(response)); + assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs()); + } + + private void verifyApiKeysForMagic(final ApiVersionsResponse response, final byte maxMagic) { + for (final ApiVersionsResponse.ApiVersion version : response.apiVersions()) { + assertTrue(ApiKeys.forId(version.apiKey).minRequiredInterBrokerMagic <= maxMagic); + } + } + + private Set<ApiKeys> apiKeysInResponse(final ApiVersionsResponse apiVersions) { + final Set<ApiKeys> apiKeys = new HashSet<>(); + for (final ApiVersionsResponse.ApiVersion version : apiVersions.apiVersions()) { + apiKeys.add(ApiKeys.forId(version.apiKey)); + } + return apiKeys; + } + + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala index 0d7b5c4..19b9b91 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala @@ -439,7 +439,6 @@ class TransactionStateManager(brokerId: Int, throw new KafkaException(s"Transaction topic number of partitions has changed from $transactionTopicPartitionCount to $curTransactionTopicPartitionCount") } - // TODO: check broker message format and error if < V2 def appendTransactionToLog(transactionalId: String, coordinatorEpoch: Int, newMetadata: TxnTransitMetadata, http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 380685f..473d108 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger import kafka.admin.{AdminUtils, RackAwareMode} -import kafka.api.{ControlledShutdownRequest, ControlledShutdownResponse} +import kafka.api.{ApiVersion, ControlledShutdownRequest, ControlledShutdownResponse, KAFKA_0_11_0_IV0} import kafka.cluster.Partition import kafka.common.{KafkaStorageException, OffsetAndMetadata, OffsetMetadata, TopicAndPartition} import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota} @@ -1277,7 +1277,7 @@ class KafkaApis(val requestChannel: RequestChannel, def sendResponseCallback(requestThrottleMs: Int) { val responseSend = if (Protocol.apiVersionSupported(ApiKeys.API_VERSIONS.id, request.header.apiVersion)) - ApiVersionsResponse.apiVersionsResponse(request.header.apiVersion, requestThrottleMs).toSend(request.connectionId, request.header) + ApiVersionsResponse.apiVersionsResponse(requestThrottleMs, config.interBrokerProtocolVersion.messageFormatVersion).toSend(request.connectionId, request.header) else ApiVersionsResponse.unsupportedVersionSend(request.connectionId, request.header) requestChannel.sendResponse(RequestChannel.Response(request, responseSend)) } @@ -1453,6 +1453,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleEndTxnRequest(request: RequestChannel.Request): Unit = { + ensureInterBrokerVersion(KAFKA_0_11_0_IV0) val endTxnRequest = request.body[EndTxnRequest] val transactionalId = endTxnRequest.transactionalId @@ -1477,6 +1478,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit = { + ensureInterBrokerVersion(KAFKA_0_11_0_IV0) authorizeClusterAction(request) val writeTxnMarkersRequest = request.body[WriteTxnMarkersRequest] val errors = new ConcurrentHashMap[java.lang.Long, java.util.Map[TopicPartition, Errors]]() @@ -1538,7 +1540,13 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def ensureInterBrokerVersion(version: ApiVersion): Unit = { + if (config.interBrokerProtocolVersion < version) + throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}") + } + def handleAddPartitionToTxnRequest(request: RequestChannel.Request): Unit = { + ensureInterBrokerVersion(KAFKA_0_11_0_IV0) val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest] val transactionalId = addPartitionsToTxnRequest.transactionalId val partitionsToAdd = addPartitionsToTxnRequest.partitions @@ -1593,6 +1601,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit = { + ensureInterBrokerVersion(KAFKA_0_11_0_IV0) val addOffsetsToTxnRequest = request.body[AddOffsetsToTxnRequest] val transactionalId = addOffsetsToTxnRequest.transactionalId val groupId = addOffsetsToTxnRequest.consumerGroupId @@ -1624,6 +1633,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleTxnOffsetCommitRequest(request: RequestChannel.Request): Unit = { + ensureInterBrokerVersion(KAFKA_0_11_0_IV0) val header = request.header val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest] http://git-wip-us.apache.org/repos/asf/kafka/blob/01233204/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala new file mode 100644 index 0000000..d22a5a0 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.server + + +import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_0_11_0_IV0} +import kafka.controller.KafkaController +import kafka.coordinator.group.GroupCoordinator +import kafka.coordinator.transaction.TransactionCoordinator +import kafka.network.RequestChannel +import kafka.security.auth.Authorizer +import kafka.server.QuotaFactory.QuotaManagers +import kafka.server._ +import kafka.utils.{MockTime, TestUtils, ZkUtils} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.UnsupportedVersionException +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.requests.{AbstractRequestResponse, AddPartitionsToTxnRequest, RequestHeader} +import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.common.utils.Utils +import org.easymock.EasyMock +import org.junit.{Before, Test} + +import scala.collection.JavaConverters + + +class KafkaApisTest { + + private val requestChannel = EasyMock.createNiceMock(classOf[RequestChannel]) + private val replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager]) + private val groupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator]) + private val adminManager = EasyMock.createNiceMock(classOf[AdminManager]) + private val txnCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator]) + private val controller = EasyMock.createNiceMock(classOf[KafkaController]) + private val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils]) + private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache]) + private val metrics = new Metrics() + private val brokerId = 1 + private val authorizer: Option[Authorizer] = None + private val quotas = EasyMock.createNiceMock(classOf[QuotaManagers]) + private val brokerTopicStats = new BrokerTopicStats + private val clusterId = "clusterId" + private val time = new MockTime + + + + def createKafkaApis(interBrokerProtocolVersion: ApiVersion): KafkaApis = { + val properties = TestUtils.createBrokerConfig(brokerId, "zk") + properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString) + properties.put(KafkaConfig.LogMessageFormatVersionProp, interBrokerProtocolVersion.toString) + new KafkaApis(requestChannel, + replicaManager, + adminManager, + groupCoordinator, + txnCoordinator, + controller, + zkUtils, + brokerId, + new KafkaConfig(properties), + metadataCache, + metrics, + authorizer, + quotas, + brokerTopicStats, + clusterId, + time + ) + } + + @Test(expected = classOf[UnsupportedVersionException]) + def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { + createKafkaApis(KAFKA_0_10_2_IV0).handleAddOffsetsToTxnRequest(null) + } + + @Test(expected = classOf[UnsupportedVersionException]) + def shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { + createKafkaApis(KAFKA_0_10_2_IV0).handleAddPartitionToTxnRequest(null) + } + + @Test(expected = classOf[UnsupportedVersionException]) + def shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported(): Unit = { + createKafkaApis(KAFKA_0_10_2_IV0).handleAddPartitionToTxnRequest(null) + } + + @Test(expected = classOf[UnsupportedVersionException]) + def shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { + createKafkaApis(KAFKA_0_10_2_IV0).handleEndTxnRequest(null) + } + + @Test(expected = classOf[UnsupportedVersionException]) + def shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported(): Unit = { + createKafkaApis(KAFKA_0_10_2_IV0).handleWriteTxnMarkersRequest(null) + } + +}