Repository: kafka Updated Branches: refs/heads/trunk 1f2ee5f0a -> fc1cfe475
http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index b864e5d..a26bc2e 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -26,8 +26,9 @@ import kafka.integration.KafkaServerTestHarness import kafka.network.SocketServer import kafka.utils._ import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.types.Struct import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol} -import org.apache.kafka.common.requests.{AbstractRequest, RequestHeader, ResponseHeader} +import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse, RequestHeader, ResponseHeader} abstract class BaseRequestTest extends KafkaServerTestHarness { private var correlationId = 0 @@ -97,41 +98,64 @@ abstract class BaseRequestTest extends KafkaServerTestHarness { } /** - * - * @param request - * @param apiKey * @param destination An optional SocketServer ot send the request to. If not set, any available server is used. * @param protocol An optional SecurityProtocol to use. If not set, PLAINTEXT is used. - * @return + * @return A ByteBuffer containing the response (without the response header) */ - def send(request: AbstractRequest, apiKey: ApiKeys, - destination: SocketServer = anySocketServer, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): ByteBuffer = { + def connectAndSend(request: AbstractRequest, apiKey: ApiKeys, + destination: SocketServer = anySocketServer, + apiVersion: Option[Short] = None, + protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): ByteBuffer = { val socket = connect(destination, protocol) - try { - send(request, apiKey, socket) - } finally { - socket.close() - } + try send(request, apiKey, socket, apiVersion) + finally socket.close() } /** - * Serializes and send the request to the given api. + * @param destination An optional SocketServer ot send the request to. If not set, any available server is used. + * @param protocol An optional SecurityProtocol to use. If not set, PLAINTEXT is used. + * @return A ByteBuffer containing the response (without the response header). + */ + def connectAndSendStruct(requestStruct: Struct, apiKey: ApiKeys, apiVersion: Short, + destination: SocketServer = anySocketServer, + protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): ByteBuffer = { + val socket = connect(destination, protocol) + try sendStruct(requestStruct, apiKey, socket, apiVersion) + finally socket.close() + } + + /** + * Serializes and sends the request to the given api. * A ByteBuffer containing the response is returned. */ - def send(request: AbstractRequest, apiKey: ApiKeys, socket: Socket): ByteBuffer = { - correlationId += 1 - val serializedBytes = { - val header = new RequestHeader(apiKey.id, request.version, "client-id", correlationId) - val byteBuffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf) - header.writeTo(byteBuffer) - request.writeTo(byteBuffer) - byteBuffer.array() - } + def send(request: AbstractRequest, apiKey: ApiKeys, socket: Socket, apiVersion: Option[Short] = None): ByteBuffer = { + val header = nextRequestHeader(apiKey, apiVersion.getOrElse(request.version)) + val serializedBytes = request.serialize(header).array + val response = requestAndReceive(socket, serializedBytes) + skipResponseHeader(response) + } + /** + * Serializes and sends the requestStruct to the given api. + * A ByteBuffer containing the response (without the response header) is returned. + */ + def sendStruct(requestStruct: Struct, apiKey: ApiKeys, socket: Socket, apiVersion: Short): ByteBuffer = { + val header = nextRequestHeader(apiKey, apiVersion) + val serializedBytes = AbstractRequestResponse.serialize(header.toStruct, requestStruct).array val response = requestAndReceive(socket, serializedBytes) + skipResponseHeader(response) + } + private def skipResponseHeader(response: Array[Byte]): ByteBuffer = { val responseBuffer = ByteBuffer.wrap(response) - ResponseHeader.parse(responseBuffer) // Parse the header to ensure its valid and move the buffer forward + // Parse the header to ensure its valid and move the buffer forward + ResponseHeader.parse(responseBuffer) responseBuffer } + + def nextRequestHeader(apiKey: ApiKeys, apiVersion: Short): RequestHeader = { + correlationId += 1 + new RequestHeader(apiKey.id, apiVersion, "client-id", correlationId) + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala index 6efa189..4ab9520 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestTest.scala @@ -116,30 +116,27 @@ class CreateTopicsRequestTest extends AbstractCreateTopicsRequestTest { // Duplicate val singleRequest = new CreateTopicsRequest.Builder(Map("duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build() - val duplicateRequest = duplicateFirstTopic(singleRequest) - assertFalse("Request doesn't have duplicate topics", duplicateRequest.duplicateTopics().isEmpty) - validateErrorCreateTopicsRequests(duplicateRequest, Map("duplicate-topic" -> error(Errors.INVALID_REQUEST, - Some("""Create topics request from client `client-id` contains multiple entries for the following topics: duplicate-topic""")))) + validateErrorCreateTopicsRequests(singleRequest, Map("duplicate-topic" -> error(Errors.INVALID_REQUEST, + Some("""Create topics request from client `client-id` contains multiple entries for the following topics: duplicate-topic"""))), + requestStruct = Some(toStructWithDuplicateFirstTopic(singleRequest))) // Duplicate Partial with validateOnly val doubleRequestValidateOnly = new CreateTopicsRequest.Builder(Map( "duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort), "other-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000, true).build() - val duplicateDoubleRequestValidateOnly = duplicateFirstTopic(doubleRequestValidateOnly) - assertFalse("Request doesn't have duplicate topics", duplicateDoubleRequestValidateOnly.duplicateTopics.isEmpty) - validateErrorCreateTopicsRequests(duplicateDoubleRequestValidateOnly, Map( + validateErrorCreateTopicsRequests(doubleRequestValidateOnly, Map( "duplicate-topic" -> error(Errors.INVALID_REQUEST), - "other-topic" -> error(Errors.NONE)), checkErrorMessage = false) + "other-topic" -> error(Errors.NONE)), checkErrorMessage = false, + requestStruct = Some(toStructWithDuplicateFirstTopic(doubleRequestValidateOnly))) // Duplicate Partial val doubleRequest = new CreateTopicsRequest.Builder(Map( "duplicate-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort), "other-topic" -> new CreateTopicsRequest.TopicDetails(1, 1.toShort)).asJava, 1000).build() - val duplicateDoubleRequest = duplicateFirstTopic(doubleRequest) - assertFalse("Request doesn't have duplicate topics", duplicateDoubleRequest.duplicateTopics.isEmpty) - validateErrorCreateTopicsRequests(duplicateDoubleRequest, Map( + validateErrorCreateTopicsRequests(doubleRequest, Map( "duplicate-topic" -> error(Errors.INVALID_REQUEST), - "other-topic" -> error(Errors.NONE)), checkErrorMessage = false) + "other-topic" -> error(Errors.NONE)), checkErrorMessage = false, + requestStruct = Some(toStructWithDuplicateFirstTopic(doubleRequest))) // Partitions/ReplicationFactor and ReplicaAssignment val assignments = replicaAssignmentToJava(Map(0 -> List(0))) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index 9a092d0..9cd53d8 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -112,12 +112,12 @@ class DeleteTopicsRequestTest extends BaseRequestTest { } private def sendDeleteTopicsRequest(request: DeleteTopicsRequest, socketServer: SocketServer = controllerSocketServer): DeleteTopicsResponse = { - val response = send(request, ApiKeys.DELETE_TOPICS, socketServer) + val response = connectAndSend(request, ApiKeys.DELETE_TOPICS, socketServer) DeleteTopicsResponse.parse(response, request.version) } private def sendMetadataRequest(request: MetadataRequest): MetadataResponse = { - val response = send(request, ApiKeys.METADATA) - MetadataResponse.parse(response) + val response = connectAndSend(request, ApiKeys.METADATA) + MetadataResponse.parse(response, request.version) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala index 1f9e18b..2d4a22a 100755 --- a/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/EdgeCaseRequestTest.scala @@ -114,15 +114,15 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness { val correlationId = -1 TestUtils.createTopic(zkUtils, topic, numPartitions = 1, replicationFactor = 1, servers = servers) + val version = 2: Short val serializedBytes = { - val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, 2, null, correlationId) + val headerBytes = requestHeaderBytes(ApiKeys.PRODUCE.id, version, null, correlationId) val messageBytes = "message".getBytes val records = MemoryRecords.readableRecords(ByteBuffer.wrap(messageBytes)) - val request = new ProduceRequest.Builder( - 1, 10000, Map(topicPartition -> records).asJava).build() - val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.sizeOf) + val request = new ProduceRequest.Builder(1, 10000, Map(topicPartition -> records).asJava).build() + val byteBuffer = ByteBuffer.allocate(headerBytes.length + request.toStruct.sizeOf) byteBuffer.put(headerBytes) - request.writeTo(byteBuffer) + request.toStruct.writeTo(byteBuffer) byteBuffer.array() } @@ -130,13 +130,13 @@ class EdgeCaseRequestTest extends KafkaServerTestHarness { val responseBuffer = ByteBuffer.wrap(response) val responseHeader = ResponseHeader.parse(responseBuffer) - val produceResponse = ProduceResponse.parse(responseBuffer) + val produceResponse = ProduceResponse.parse(responseBuffer, version) - assertEquals("The response should parse completely", 0, responseBuffer.remaining()) - assertEquals("The correlationId should match request", correlationId, responseHeader.correlationId()) - assertEquals("One partition response should be returned", 1, produceResponse.responses().size()) + assertEquals("The response should parse completely", 0, responseBuffer.remaining) + assertEquals("The correlationId should match request", correlationId, responseHeader.correlationId) + assertEquals("One partition response should be returned", 1, produceResponse.responses.size) - val partitionResponse = produceResponse.responses().get(topicPartition) + val partitionResponse = produceResponse.responses.get(topicPartition) assertNotNull(partitionResponse) assertEquals("There should be no error", Errors.NONE, partitionResponse.error) } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 3811360..64be5b3 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -24,7 +24,7 @@ import kafka.utils.TestUtils import kafka.utils.TestUtils._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.protocol.{ApiKeys, Errors} +import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils} import org.apache.kafka.common.record.LogEntry import org.apache.kafka.common.requests.{FetchRequest, FetchResponse} import org.apache.kafka.common.serialization.StringSerializer @@ -56,7 +56,7 @@ class FetchRequestTest extends BaseRequestTest { private def createFetchRequest(maxResponseBytes: Int, maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition], offsetMap: Map[TopicPartition, Long] = Map.empty): FetchRequest = - new FetchRequest.Builder(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap)) + FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, topicPartitions, offsetMap)) .setMaxBytes(maxResponseBytes).build() private def createPartitionMap(maxPartitionBytes: Int, topicPartitions: Seq[TopicPartition], @@ -69,8 +69,8 @@ class FetchRequestTest extends BaseRequestTest { } private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse = { - val response = send(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId)) - FetchResponse.parse(response) + val response = connectAndSend(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId)) + FetchResponse.parse(response, ProtoUtils.latestVersion(ApiKeys.FETCH.id)) } @Test @@ -156,10 +156,9 @@ class FetchRequestTest extends BaseRequestTest { val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get - val fetchRequestBuilder = new FetchRequest.Builder( - Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, Seq(topicPartition))). - setVersion(2) - val fetchResponse = sendFetchRequest(leaderId, fetchRequestBuilder.build()) + val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, + createPartitionMap(maxPartitionBytes, Seq(topicPartition))).build(2) + val fetchResponse = sendFetchRequest(leaderId, fetchRequest) val partitionData = fetchResponse.responseData.get(topicPartition) assertEquals(Errors.NONE, partitionData.error) assertTrue(partitionData.highWatermark > 0) http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 99a95ad..1b5007d 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -22,7 +22,7 @@ import util.Arrays.asList import kafka.common.BrokerEndPointNotAvailableException import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.protocol.{Errors, SecurityProtocol} +import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils, SecurityProtocol} import org.apache.kafka.common.requests.{PartitionState, UpdateMetadataRequest} import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint} import org.junit.Test @@ -69,8 +69,9 @@ class MetadataCacheTest { new TopicPartition(topic, 1) -> new PartitionState(controllerEpoch, 1, 1, asList(1), zkVersion, asSet(1)), new TopicPartition(topic, 2) -> new PartitionState(controllerEpoch, 2, 2, asList(2), zkVersion, asSet(2))) - val updateMetadataRequest = new UpdateMetadataRequest.Builder( - controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() + val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, + partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, SecurityProtocol.SSL)) { @@ -120,8 +121,9 @@ class MetadataCacheTest { val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, asList(0), zkVersion, asSet(0))) - val updateMetadataRequest = new UpdateMetadataRequest.Builder( - controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() + val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, + partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName) @@ -163,8 +165,9 @@ class MetadataCacheTest { val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas)) - val updateMetadataRequest = new UpdateMetadataRequest.Builder( - controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() + val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, + partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) // Validate errorUnavailableEndpoints = false @@ -222,8 +225,9 @@ class MetadataCacheTest { val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, zkVersion, replicas)) - val updateMetadataRequest = new UpdateMetadataRequest.Builder( - controllerId, controllerEpoch, partitionStates.asJava, brokers.asJava).build() + val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, controllerId, controllerEpoch, + partitionStates.asJava, brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) // Validate errorUnavailableEndpoints = false @@ -273,8 +277,9 @@ class MetadataCacheTest { val isr = asList[Integer](0, 1) val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas)) - val updateMetadataRequest = new UpdateMetadataRequest.Builder( - 2, controllerEpoch, partitionStates.asJava, brokers.asJava).build() + val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, partitionStates.asJava, + brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) try { @@ -305,8 +310,9 @@ class MetadataCacheTest { val isr = asList[Integer](0, 1) val partitionStates = Map( new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, leader, leaderEpoch, isr, 3, replicas)) - val updateMetadataRequest = new UpdateMetadataRequest.Builder( - 2, controllerEpoch, partitionStates.asJava, brokers.asJava).build() + val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id) + val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, partitionStates.asJava, + brokers.asJava).build() cache.updateCache(15, updateMetadataRequest) } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index f3bb912..ed0e805 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -36,14 +36,14 @@ class MetadataRequestTest extends BaseRequestTest { @Test def testClusterIdWithRequestVersion1() { - val v1MetadataResponse = sendMetadataRequest(MetadataRequest.allTopics(1.toShort)) + val v1MetadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort)) val v1ClusterId = v1MetadataResponse.clusterId assertNull(s"v1 clusterId should be null", v1ClusterId) } @Test def testClusterIdIsValid() { - val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(2.toShort)) + val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(2.toShort)) isValidClusterId(metadataResponse.clusterId) } @@ -51,7 +51,7 @@ class MetadataRequestTest extends BaseRequestTest { def testControllerId() { val controllerServer = servers.find(_.kafkaController.isActive).get val controllerId = controllerServer.config.brokerId - val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(1.toShort)) + val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort)) assertEquals("Controller id should match the active controller", controllerId, metadataResponse.controller.id) @@ -64,14 +64,14 @@ class MetadataRequestTest extends BaseRequestTest { val controllerId2 = controllerServer2.config.brokerId assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2) TestUtils.waitUntilTrue(() => { - val metadataResponse2 = sendMetadataRequest(MetadataRequest.allTopics(1.toShort)) + val metadataResponse2 = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort)) metadataResponse2.controller != null && controllerServer2.apis.brokerId == metadataResponse2.controller.id }, "Controller id should match the active controller after failover", 5000) } @Test def testRack() { - val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(1.toShort)) + val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort)) // Validate rack matches what's set in generateConfigs() above metadataResponse.brokers.asScala.foreach { broker => assertEquals("Rack information should match config", s"rack/${broker.id}", broker.rack) @@ -86,7 +86,7 @@ class MetadataRequestTest extends BaseRequestTest { TestUtils.createTopic(zkUtils, internalTopic, 3, 2, servers) TestUtils.createTopic(zkUtils, notInternalTopic, 3, 2, servers) - val metadataResponse = sendMetadataRequest(MetadataRequest.allTopics(1.toShort)) + val metadataResponse = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort)) assertTrue("Response should have no errors", metadataResponse.errors.isEmpty) val topicMetadata = metadataResponse.topicMetadata.asScala @@ -124,7 +124,7 @@ class MetadataRequestTest extends BaseRequestTest { assertEquals("V0 Response should have 2 (all) topics", 2, metadataResponseV0.topicMetadata.size()) // v1, Null represents all topics - val metadataResponseV1 = sendMetadataRequest(MetadataRequest.allTopics(1.toShort)) + val metadataResponseV1 = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort)) assertTrue("V1 Response should have no errors", metadataResponseV1.errors.isEmpty) assertEquals("V1 Response should have 2 (all) topics", 2, metadataResponseV1.topicMetadata.size()) } @@ -177,7 +177,7 @@ class MetadataRequestTest extends BaseRequestTest { } private def sendMetadataRequest(request: MetadataRequest): MetadataResponse = { - val response = send(request, ApiKeys.METADATA) + val response = connectAndSend(request, ApiKeys.METADATA) MetadataResponse.parse(response, request.version) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index b05be9d..81118fa 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -88,8 +88,8 @@ class ProduceRequestTest extends BaseRequestTest { } private def sendProduceRequest(leaderId: Int, request: ProduceRequest): ProduceResponse = { - val response = send(request, ApiKeys.PRODUCE, destination = brokerSocketServer(leaderId)) - ProduceResponse.parse(response) + val response = connectAndSend(request, ApiKeys.PRODUCE, destination = brokerSocketServer(leaderId)) + ProduceResponse.parse(response, request.version) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 927ace9..92a518d 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -20,14 +20,13 @@ import java.io.IOException import java.net.Socket import java.util.Collections -import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils, SecurityProtocol} +import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests.{ApiVersionsRequest, ApiVersionsResponse} import org.apache.kafka.common.requests.SaslHandshakeRequest import org.apache.kafka.common.requests.SaslHandshakeResponse import org.junit.Test import org.junit.Assert._ import kafka.api.SaslTestHarness -import org.apache.kafka.common.protocol.types.Struct class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness { override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT @@ -42,8 +41,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness { def testApiVersionsRequestBeforeSaslHandshakeRequest() { val plaintextSocket = connect(protocol = securityProtocol) try { - val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, - new ApiVersionsRequest.Builder().setVersion(0).build()) + val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest.Builder().build(0)) ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse) sendSaslHandshakeRequestValidateResponse(plaintextSocket) } finally { @@ -57,8 +55,7 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness { try { sendSaslHandshakeRequestValidateResponse(plaintextSocket) try { - sendApiVersionsRequest(plaintextSocket, - new ApiVersionsRequest.Builder().setVersion(0).build()) + sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest.Builder().build(0)) fail("Versions Request during Sasl handshake did not fail") } catch { case _: IOException => // expected exception @@ -72,12 +69,10 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness { def testApiVersionsRequestWithUnsupportedVersion() { val plaintextSocket = connect(protocol = securityProtocol) try { - val apiVersionsRequest = new ApiVersionsRequest( - new Struct(ProtoUtils.requestSchema(ApiKeys.API_VERSIONS.id, 0)), Short.MaxValue); - val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, apiVersionsRequest) + val apiVersionsRequest = new ApiVersionsRequest(0) + val apiVersionsResponse = sendApiVersionsRequest(plaintextSocket, apiVersionsRequest, Some(Short.MaxValue)) assertEquals(Errors.UNSUPPORTED_VERSION, apiVersionsResponse.error) - val apiVersionsResponse2 = sendApiVersionsRequest(plaintextSocket, - new ApiVersionsRequest.Builder().setVersion(0).build()) + val apiVersionsResponse2 = sendApiVersionsRequest(plaintextSocket, new ApiVersionsRequest.Builder().build(0)) ApiVersionsRequestTest.validateApiVersionsResponse(apiVersionsResponse2) sendSaslHandshakeRequestValidateResponse(plaintextSocket) } finally { @@ -85,15 +80,17 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness { } } - private def sendApiVersionsRequest(socket: Socket, request: ApiVersionsRequest): ApiVersionsResponse = { - val response = send(request, ApiKeys.API_VERSIONS, socket) - ApiVersionsResponse.parse(response) + private def sendApiVersionsRequest(socket: Socket, request: ApiVersionsRequest, + apiVersion: Option[Short] = None): ApiVersionsResponse = { + val response = send(request, ApiKeys.API_VERSIONS, socket, apiVersion) + ApiVersionsResponse.parse(response, request.version) } private def sendSaslHandshakeRequestValidateResponse(socket: Socket) { - val response = send(new SaslHandshakeRequest("PLAIN"), ApiKeys.SASL_HANDSHAKE, socket) - val handshakeResponse = SaslHandshakeResponse.parse(response) + val request = new SaslHandshakeRequest("PLAIN") + val response = send(request, ApiKeys.SASL_HANDSHAKE, socket) + val handshakeResponse = SaslHandshakeResponse.parse(response, request.version) assertEquals(Errors.NONE, handshakeResponse.error) - assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms()) + assertEquals(Collections.singletonList("PLAIN"), handshakeResponse.enabledMechanisms) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java index 69b3c46..5399653 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java @@ -106,8 +106,8 @@ public class InternalTopicManagerTest { MetadataResponse.PartitionMetadata partitionMetadata = new MetadataResponse.PartitionMetadata(Errors.NONE, 1, node, new ArrayList<Node>(), new ArrayList<Node>()); MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(Errors.NONE, topic, true, Collections.singletonList(partitionMetadata)); MetadataResponse response = new MetadataResponse(Collections.<Node>emptyList(), null, MetadataResponse.NO_CONTROLLER_ID, - Collections.singletonList(topicMetadata), 0); + Collections.singletonList(topicMetadata)); return response; } } -} \ No newline at end of file +}