[ https://issues.apache.org/jira/browse/KAFKA-6796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16451660#comment-16451660 ]
ASF GitHub Bot commented on KAFKA-6796: --------------------------------------- ijuma closed pull request #4883: KAFKA-6796; Fix surprising UNKNOWN_TOPIC error from requests to non-replicas URL: https://github.com/apache/kafka/pull/4883 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a0caa4a53c0..7bc9e3ee750 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -287,7 +287,7 @@ class KafkaApis(val requestChannel: RequestChannel, for ((topicPartition, partitionData) <- offsetCommitRequest.offsetData.asScala) { if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) unauthorizedTopicErrors += (topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED) - else if (!metadataCache.contains(topicPartition.topic)) + else if (!metadataCache.contains(topicPartition)) nonExistingTopicErrors += (topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION) else authorizedTopicRequestInfoBldr += (topicPartition -> partitionData) @@ -401,7 +401,7 @@ class KafkaApis(val requestChannel: RequestChannel, for ((topicPartition, memoryRecords) <- produceRequest.partitionRecordsOrFail.asScala) { if (!authorize(request.session, Write, new Resource(Topic, topicPartition.topic))) unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED) - else if (!metadataCache.contains(topicPartition.topic)) + else if (!metadataCache.contains(topicPartition)) nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION) else authorizedRequestInfo += (topicPartition -> memoryRecords) @@ -502,13 +502,13 @@ class KafkaApis(val requestChannel: RequestChannel, if (fetchRequest.isFromFollower()) { // The follower must have ClusterAction on ClusterResource in order to fetch partition data. if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { - fetchContext.foreachPartition((part, data) => { - if (!metadataCache.contains(part.topic)) { - erroneous += part -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, + fetchContext.foreachPartition((topicPartition, data) => { + if (!metadataCache.contains(topicPartition)) { + erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) } else { - interesting += (part -> data) + interesting += (topicPartition -> data) } }) } else { @@ -520,17 +520,17 @@ class KafkaApis(val requestChannel: RequestChannel, } } else { // Regular Kafka consumers need READ permission on each partition they are fetching. - fetchContext.foreachPartition((part, data) => { - if (!authorize(request.session, Read, new Resource(Topic, part.topic))) - erroneous += part -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, + fetchContext.foreachPartition((topicPartition, data) => { + if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) + erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) - else if (!metadataCache.contains(part.topic)) - erroneous += part -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, + else if (!metadataCache.contains(topicPartition)) + erroneous += topicPartition -> new FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY) else - interesting += (part -> data) + interesting += (topicPartition -> data) }) } @@ -1062,7 +1062,7 @@ class KafkaApis(val requestChannel: RequestChannel, // version 0 reads offsets from ZK val authorizedPartitionData = authorizedPartitions.map { topicPartition => try { - if (!metadataCache.contains(topicPartition.topic)) + if (!metadataCache.contains(topicPartition)) (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION) else { val payloadOpt = zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition) @@ -1508,7 +1508,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authorize(request.session, Delete, new Resource(Topic, topicPartition.topic))) unauthorizedTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse( DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.TOPIC_AUTHORIZATION_FAILED) - else if (!metadataCache.contains(topicPartition.topic)) + else if (!metadataCache.contains(topicPartition)) nonExistingTopicResponses += topicPartition -> new DeleteRecordsResponse.PartitionResponse( DeleteRecordsResponse.INVALID_LOW_WATERMARK, Errors.UNKNOWN_TOPIC_OR_PARTITION) else @@ -1720,7 +1720,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (org.apache.kafka.common.internals.Topic.isInternal(topicPartition.topic) || !authorize(request.session, Write, new Resource(Topic, topicPartition.topic))) unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED - else if (!metadataCache.contains(topicPartition.topic)) + else if (!metadataCache.contains(topicPartition)) nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION else authorizedPartitions.add(topicPartition) @@ -1806,7 +1806,7 @@ class KafkaApis(val requestChannel: RequestChannel, for ((topicPartition, commitedOffset) <- txnOffsetCommitRequest.offsets.asScala) { if (!authorize(request.session, Read, new Resource(Topic, topicPartition.topic))) unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED - else if (!metadataCache.contains(topicPartition.topic)) + else if (!metadataCache.contains(topicPartition)) nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION else authorizedTopicCommittedOffsets += (topicPartition -> commitedOffset) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 0c2b0d805f8..da501174acd 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -426,8 +426,16 @@ class ReplicaManager(val config: KafkaConfig, def getPartitionAndLeaderReplicaIfLocal(topicPartition: TopicPartition): (Partition, Replica) = { val partitionOpt = getPartition(topicPartition) partitionOpt match { + case None if metadataCache.contains(topicPartition) => + // The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER which + // forces clients to refresh metadata to find the new location. This can happen, for example, + // during a partition reassignment if a produce request from the client is sent to a broker after + // the local replica has been deleted. + throw new NotLeaderForPartitionException(s"Broker $localBrokerId is not a replica of $topicPartition") + case None => - throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist on $localBrokerId") + throw new UnknownTopicOrPartitionException(s"Partition $topicPartition doesn't exist") + case Some(partition) => if (partition eq ReplicaManager.OfflinePartition) throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId") @@ -736,17 +744,8 @@ class ReplicaManager(val config: KafkaConfig, Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}")))) } else { try { - val partitionOpt = getPartition(topicPartition) - val info = partitionOpt match { - case Some(partition) => - if (partition eq ReplicaManager.OfflinePartition) - throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory on broker $localBrokerId") - partition.appendRecordsToLeader(records, isFromClient, requiredAcks) - - case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d" - .format(topicPartition, localBrokerId)) - } - + val (partition, _) = getPartitionAndLeaderReplicaIfLocal(topicPartition) + val info = partition.appendRecordsToLeader(records, isFromClient, requiredAcks) val numAppendedMessages = info.numMessages // update stats for successfully appended bytes and messages as bytesInRate and messageInRate diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index f2b3552feaf..03137e10dea 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -170,6 +170,27 @@ class FetchRequestTest extends BaseRequestTest { assertEquals(0, records(partitionData).map(_.sizeInBytes).sum) } + @Test + def testFetchRequestToNonReplica(): Unit = { + val topic = "topic" + val partition = 0 + val topicPartition = new TopicPartition(topic, partition) + + // Create a single-partition topic and find a broker which is not the leader + val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, 1, servers) + val leader = partitionToLeader(partition) + val nonReplicaOpt = servers.find(_.config.brokerId != leader) + assertTrue(nonReplicaOpt.isDefined) + val nonReplicaId = nonReplicaOpt.get.config.brokerId + + // Send the fetch request to the non-replica and verify the error code + val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(1024, + Seq(topicPartition))).build() + val fetchResponse = sendFetchRequest(nonReplicaId, fetchRequest) + val partitionData = fetchResponse.responseData.get(topicPartition) + assertEquals(Errors.NOT_LEADER_FOR_PARTITION, partitionData.error) + } + /** * Tests that down-conversions dont leak memory. Large down conversions are triggered * in the server. The client closes its connection after reading partial data when the diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 382364f7a83..96f74a0fffc 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -20,6 +20,7 @@ package kafka.server import java.lang.{Long => JLong} import java.net.InetAddress import java.util +import java.util.Collections import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0} import kafka.cluster.Replica @@ -40,6 +41,7 @@ import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, EndPoint} import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} @@ -106,6 +108,84 @@ class KafkaApisTest { ) } + @Test + def testOffsetCommitWithInvalidPartition(): Unit = { + val topic = "topic" + setupBasicMetadataCache(topic, numPartitions = 1) + + def checkInvalidPartition(invalidPartitionId: Int): Unit = { + EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel) + + val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId) + val partitionOffsetCommitData = new OffsetCommitRequest.PartitionData(15L, "") + val (offsetCommitRequest, request) = buildRequest(new OffsetCommitRequest.Builder("groupId", + Map(invalidTopicPartition -> partitionOffsetCommitData).asJava)) + + val capturedResponse = expectThrottleCallbackAndInvoke() + EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel) + createKafkaApis().handleOffsetCommitRequest(request) + + val response = readResponse(ApiKeys.OFFSET_COMMIT, offsetCommitRequest, capturedResponse) + .asInstanceOf[OffsetCommitResponse] + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.responseData().get(invalidTopicPartition)) + } + + checkInvalidPartition(-1) + checkInvalidPartition(1) // topic has only one partition + } + + @Test + def testTxnOffsetCommitWithInvalidPartition(): Unit = { + val topic = "topic" + setupBasicMetadataCache(topic, numPartitions = 1) + + def checkInvalidPartition(invalidPartitionId: Int): Unit = { + EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel) + + val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId) + val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "") + val (offsetCommitRequest, request) = buildRequest(new TxnOffsetCommitRequest.Builder("txnlId", "groupId", + 15L, 0.toShort, Map(invalidTopicPartition -> partitionOffsetCommitData).asJava)) + + val capturedResponse = expectThrottleCallbackAndInvoke() + EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel) + createKafkaApis().handleTxnOffsetCommitRequest(request) + + val response = readResponse(ApiKeys.TXN_OFFSET_COMMIT, offsetCommitRequest, capturedResponse) + .asInstanceOf[TxnOffsetCommitResponse] + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition)) + } + + checkInvalidPartition(-1) + checkInvalidPartition(1) // topic has only one partition + } + + @Test + def testAddPartitionsToTxnWithInvalidPartition(): Unit = { + val topic = "topic" + setupBasicMetadataCache(topic, numPartitions = 1) + + def checkInvalidPartition(invalidPartitionId: Int): Unit = { + EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel) + + val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId) + + val (addPartitionsToTxnRequest, request) = buildRequest(new AddPartitionsToTxnRequest.Builder( + "txnlId", 15L, 0.toShort, List(invalidTopicPartition).asJava)) + + val capturedResponse = expectThrottleCallbackAndInvoke() + EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel) + createKafkaApis().handleAddPartitionToTxnRequest(request) + + val response = readResponse(ApiKeys.ADD_PARTITIONS_TO_TXN, addPartitionsToTxnRequest, capturedResponse) + .asInstanceOf[AddPartitionsToTxnResponse] + assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, response.errors().get(invalidTopicPartition)) + } + + checkInvalidPartition(-1) + checkInvalidPartition(1) // topic has only one partition + } + @Test(expected = classOf[UnsupportedVersionException]) def shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported(): Unit = { createKafkaApis(KAFKA_0_10_2_IV0).handleAddOffsetsToTxnRequest(null) @@ -284,8 +364,6 @@ class KafkaApisTest { val timestamp: JLong = time.milliseconds() val limitOffset = 15L - val capturedResponse = EasyMock.newCapture[RequestChannel.Response]() - val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]() val replica = EasyMock.mock(classOf[Replica]) val log = EasyMock.mock(classOf[Log]) EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica) @@ -295,8 +373,7 @@ class KafkaApisTest { EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = limitOffset)) EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log)) EasyMock.expect(log.fetchOffsetsByTimestamp(timestamp)).andReturn(Some(TimestampOffset(timestamp = timestamp, offset = limitOffset))) - expectThrottleCallbackAndInvoke(capturedThrottleCallback) - EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + val capturedResponse = expectThrottleCallbackAndInvoke() EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log) val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel) @@ -327,8 +404,6 @@ class KafkaApisTest { val tp = new TopicPartition("foo", 0) val limitOffset = 15L - val capturedResponse = EasyMock.newCapture[RequestChannel.Response]() - val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]() val replica = EasyMock.mock(classOf[Replica]) val log = EasyMock.mock(classOf[Log]) EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica) @@ -339,8 +414,7 @@ class KafkaApisTest { EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log)) EasyMock.expect(log.fetchOffsetsByTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP)) .andReturn(Some(TimestampOffset(timestamp = ListOffsetResponse.UNKNOWN_TIMESTAMP, offset = limitOffset))) - expectThrottleCallbackAndInvoke(capturedThrottleCallback) - EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + val capturedResponse = expectThrottleCallbackAndInvoke() EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log) val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel) @@ -393,14 +467,12 @@ class KafkaApisTest { * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively. */ private def updateMetadataCacheWithInconsistentListeners(): (ListenerName, ListenerName) = { - import UpdateMetadataRequest.{Broker => UBroker} - import UpdateMetadataRequest.{EndPoint => UEndPoint} val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) val anotherListener = new ListenerName("LISTENER2") val brokers = Set( - new UBroker(0, Seq(new UEndPoint("broker0", 9092, SecurityProtocol.PLAINTEXT, plaintextListener), - new UEndPoint("broker0", 9093, SecurityProtocol.PLAINTEXT, anotherListener)).asJava, "rack"), - new UBroker(1, Seq(new UEndPoint("broker1", 9092, SecurityProtocol.PLAINTEXT, plaintextListener)).asJava, + new Broker(0, Seq(new EndPoint("broker0", 9092, SecurityProtocol.PLAINTEXT, plaintextListener), + new EndPoint("broker0", 9093, SecurityProtocol.PLAINTEXT, anotherListener)).asJava, "rack"), + new Broker(1, Seq(new EndPoint("broker1", 9092, SecurityProtocol.PLAINTEXT, plaintextListener)).asJava, "rack") ) val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, @@ -410,10 +482,7 @@ class KafkaApisTest { } private def sendMetadataRequestWithInconsistentListeners(requestListener: ListenerName): MetadataResponse = { - val capturedResponse = EasyMock.newCapture[RequestChannel.Response]() - val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]() - expectThrottleCallbackAndInvoke(capturedThrottleCallback) - EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + val capturedResponse = expectThrottleCallbackAndInvoke() EasyMock.replay(clientRequestQuotaManager, requestChannel) val (metadataRequest, requestChannelRequest) = buildRequest(MetadataRequest.Builder.allTopics, requestListener) @@ -426,8 +495,6 @@ class KafkaApisTest { val tp = new TopicPartition("foo", 0) val latestOffset = 15L - val capturedResponse = EasyMock.newCapture[RequestChannel.Response]() - val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]() val replica = EasyMock.mock(classOf[Replica]) val log = EasyMock.mock(classOf[Log]) EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica) @@ -435,8 +502,8 @@ class KafkaApisTest { EasyMock.expect(replica.highWatermark).andReturn(LogOffsetMetadata(messageOffset = latestOffset)) else EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset = latestOffset)) - expectThrottleCallbackAndInvoke(capturedThrottleCallback) - EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + + val capturedResponse = expectThrottleCallbackAndInvoke() EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, replica, log) val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel) @@ -484,7 +551,8 @@ class KafkaApisTest { AbstractResponse.parseResponse(api, struct) } - private def expectThrottleCallbackAndInvoke(capturedThrottleCallback: Capture[Int => Unit]): Unit = { + private def expectThrottleCallbackAndInvoke(): Capture[RequestChannel.Response] = { + val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]() EasyMock.expect(clientRequestQuotaManager.maybeRecordAndThrottle( EasyMock.anyObject[RequestChannel.Request](), EasyMock.capture(capturedThrottleCallback))) @@ -494,6 +562,21 @@ class KafkaApisTest { callback(0) } }) + + val capturedResponse = EasyMock.newCapture[RequestChannel.Response]() + EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse))) + capturedResponse + } + + private def setupBasicMetadataCache(topic: String, numPartitions: Int = 1): Unit = { + val replicas = List(0.asInstanceOf[Integer]).asJava + val partitionState = new UpdateMetadataRequest.PartitionState(1, 0, 1, replicas, 0, replicas, Collections.emptyList()) + val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT) + val broker = new Broker(0, Seq(new EndPoint("broker0", 9092, SecurityProtocol.PLAINTEXT, plaintextListener)).asJava, "rack") + val partitions = (0 until numPartitions).map(new TopicPartition(topic, _) -> partitionState).toMap + val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0, + 0, partitions.asJava, Set(broker).asJava).build() + metadataCache.updateCache(correlationId = 0, updateMetadataRequest) } } diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 60244996dc8..4e66494374f 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -59,6 +59,29 @@ class ProduceRequestTest extends BaseRequestTest { new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1) } + @Test + def testProduceToNonReplica() { + val topic = "topic" + val partition = 0 + + // Create a single-partition topic and find a broker which is not the leader + val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, 1, servers) + val leader = partitionToLeader(partition) + val nonReplicaOpt = servers.find(_.config.brokerId != leader) + assertTrue(nonReplicaOpt.isDefined) + val nonReplicaId = nonReplicaOpt.get.config.brokerId + + // Send the produce request to the non-replica + val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("key".getBytes, "value".getBytes)) + val topicPartition = new TopicPartition("topic", partition) + val partitionRecords = Map(topicPartition -> records) + val produceRequest = ProduceRequest.Builder.forCurrentMagic(-1, 3000, partitionRecords.asJava).build() + + val produceResponse = sendProduceRequest(nonReplicaId, produceRequest) + assertEquals(1, produceResponse.responses.size) + assertEquals(Errors.NOT_LEADER_FOR_PARTITION, produceResponse.responses.asScala.head._2.error) + } + /* returns a pair of partition id and leader id */ private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int) = { val partitionToLeader = TestUtils.createTopic(zkClient, topic, 3, 2, servers) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Surprising UNKNOWN_TOPIC error for produce/fetch requests to non-replicas > ------------------------------------------------------------------------- > > Key: KAFKA-6796 > URL: https://issues.apache.org/jira/browse/KAFKA-6796 > Project: Kafka > Issue Type: Bug > Affects Versions: 1.1.0, 1.0.1 > Reporter: Jason Gustafson > Assignee: Jason Gustafson > Priority: Major > > Currently if the client sends a produce request or a fetch request to a > broker which isn't a replica, we return UNKNOWN_TOPIC_OR_PARTITION. This is a > bit surprising to see when the topic actually exists. It would be better to > return NOT_LEADER to avoid confusion. Clients typically handle both errors by > refreshing metadata and retrying, so changing this should not cause any > change in behavior on the client. This case can be hit following a partition > reassignment after the leader is moved and the local replica is deleted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)