This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new acd669e  KAFKA-6796; Fix surprising UNKNOWN_TOPIC error from requests 
to non-replicas (#4883)
acd669e is described below

commit acd669e4248c605ac54c38862a8678f521f6f574
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Apr 24 21:49:44 2018 -0700

    KAFKA-6796; Fix surprising UNKNOWN_TOPIC error from requests to 
non-replicas (#4883)
    
    Currently if the client sends a produce request or a fetch request to a 
broker which isn't a replica,
    we return UNKNOWN_TOPIC_OR_PARTITION. This is a bit surprising to see when 
the topic actually
    exists. It would be better to return NOT_LEADER to avoid confusion. Clients 
typically handle both errors by refreshing metadata and retrying, so changing 
this should not cause any change in behavior on the client. This case can be 
hit following a partition reassignment after the leader is moved and the local 
replica is deleted.
    
    To validate the current behavior and the fix, I've added integration tests 
for the fetch and produce APIs.
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  32 +++---
 .../main/scala/kafka/server/ReplicaManager.scala   |  23 ++--
 .../scala/unit/kafka/server/FetchRequestTest.scala |  21 ++++
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 127 +++++++++++++++++----
 .../unit/kafka/server/ProduceRequestTest.scala     |  23 ++++
 5 files changed, 176 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index a0caa4a..7bc9e3e 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -287,7 +287,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       for ((topicPartition, partitionData) <- 
offsetCommitRequest.offsetData.asScala) {
         if (!authorize(request.session, Read, new Resource(Topic, 
topicPartition.topic)))
           unauthorizedTopicErrors += (topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED)
-        else if (!metadataCache.contains(topicPartition.topic))
+        else if (!metadataCache.contains(topicPartition))
           nonExistingTopicErrors += (topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
         else
           authorizedTopicRequestInfoBldr += (topicPartition -> partitionData)
@@ -401,7 +401,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     for ((topicPartition, memoryRecords) <- 
produceRequest.partitionRecordsOrFail.asScala) {
       if (!authorize(request.session, Write, new Resource(Topic, 
topicPartition.topic)))
         unauthorizedTopicResponses += topicPartition -> new 
PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
-      else if (!metadataCache.contains(topicPartition.topic))
+      else if (!metadataCache.contains(topicPartition))
         nonExistingTopicResponses += topicPartition -> new 
PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
       else
         authorizedRequestInfo += (topicPartition -> memoryRecords)
@@ -502,13 +502,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (fetchRequest.isFromFollower()) {
       // The follower must have ClusterAction on ClusterResource in order to 
fetch partition data.
       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) 
{
-        fetchContext.foreachPartition((part, data) => {
-          if (!metadataCache.contains(part.topic)) {
-            erroneous += part -> new 
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+        fetchContext.foreachPartition((topicPartition, data) => {
+          if (!metadataCache.contains(topicPartition)) {
+            erroneous += topicPartition -> new 
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
               FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
               FetchResponse.INVALID_LOG_START_OFFSET, null, 
MemoryRecords.EMPTY)
           } else {
-            interesting += (part -> data)
+            interesting += (topicPartition -> data)
           }
         })
       } else {
@@ -520,17 +520,17 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     } else {
       // Regular Kafka consumers need READ permission on each partition they 
are fetching.
-      fetchContext.foreachPartition((part, data) => {
-        if (!authorize(request.session, Read, new Resource(Topic, part.topic)))
-          erroneous += part -> new 
FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
+      fetchContext.foreachPartition((topicPartition, data) => {
+        if (!authorize(request.session, Read, new Resource(Topic, 
topicPartition.topic)))
+          erroneous += topicPartition -> new 
FetchResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
             FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
             FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
-        else if (!metadataCache.contains(part.topic))
-          erroneous += part -> new 
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+        else if (!metadataCache.contains(topicPartition))
+          erroneous += topicPartition -> new 
FetchResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
             FetchResponse.INVALID_HIGHWATERMARK, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
             FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
         else
-          interesting += (part -> data)
+          interesting += (topicPartition -> data)
       })
     }
 
@@ -1062,7 +1062,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             // version 0 reads offsets from ZK
             val authorizedPartitionData = authorizedPartitions.map { 
topicPartition =>
               try {
-                if (!metadataCache.contains(topicPartition.topic))
+                if (!metadataCache.contains(topicPartition))
                   (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
                 else {
                   val payloadOpt = 
zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
@@ -1508,7 +1508,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       if (!authorize(request.session, Delete, new Resource(Topic, 
topicPartition.topic)))
         unauthorizedTopicResponses += topicPartition -> new 
DeleteRecordsResponse.PartitionResponse(
           DeleteRecordsResponse.INVALID_LOW_WATERMARK, 
Errors.TOPIC_AUTHORIZATION_FAILED)
-      else if (!metadataCache.contains(topicPartition.topic))
+      else if (!metadataCache.contains(topicPartition))
         nonExistingTopicResponses += topicPartition -> new 
DeleteRecordsResponse.PartitionResponse(
           DeleteRecordsResponse.INVALID_LOW_WATERMARK, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
       else
@@ -1720,7 +1720,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         if 
(org.apache.kafka.common.internals.Topic.isInternal(topicPartition.topic) ||
             !authorize(request.session, Write, new Resource(Topic, 
topicPartition.topic)))
           unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-        else if (!metadataCache.contains(topicPartition.topic))
+        else if (!metadataCache.contains(topicPartition))
           nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
         else
           authorizedPartitions.add(topicPartition)
@@ -1806,7 +1806,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       for ((topicPartition, commitedOffset) <- 
txnOffsetCommitRequest.offsets.asScala) {
         if (!authorize(request.session, Read, new Resource(Topic, 
topicPartition.topic)))
           unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-        else if (!metadataCache.contains(topicPartition.topic))
+        else if (!metadataCache.contains(topicPartition))
           nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
         else
           authorizedTopicCommittedOffsets += (topicPartition -> commitedOffset)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 0c2b0d8..da50117 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -426,8 +426,16 @@ class ReplicaManager(val config: KafkaConfig,
   def getPartitionAndLeaderReplicaIfLocal(topicPartition: TopicPartition): 
(Partition, Replica) =  {
     val partitionOpt = getPartition(topicPartition)
     partitionOpt match {
+      case None if metadataCache.contains(topicPartition) =>
+        // The topic exists, but this broker is no longer a replica of it, so 
we return NOT_LEADER which
+        // forces clients to refresh metadata to find the new location. This 
can happen, for example,
+        // during a partition reassignment if a produce request from the 
client is sent to a broker after
+        // the local replica has been deleted.
+        throw new NotLeaderForPartitionException(s"Broker $localBrokerId is 
not a replica of $topicPartition")
+
       case None =>
-        throw new UnknownTopicOrPartitionException(s"Partition $topicPartition 
doesn't exist on $localBrokerId")
+        throw new UnknownTopicOrPartitionException(s"Partition $topicPartition 
doesn't exist")
+
       case Some(partition) =>
         if (partition eq ReplicaManager.OfflinePartition)
           throw new KafkaStorageException(s"Partition $topicPartition is in an 
offline log directory on broker $localBrokerId")
@@ -736,17 +744,8 @@ class ReplicaManager(val config: KafkaConfig,
           Some(new InvalidTopicException(s"Cannot append to internal topic 
${topicPartition.topic}"))))
       } else {
         try {
-          val partitionOpt = getPartition(topicPartition)
-          val info = partitionOpt match {
-            case Some(partition) =>
-              if (partition eq ReplicaManager.OfflinePartition)
-                throw new KafkaStorageException(s"Partition $topicPartition is 
in an offline log directory on broker $localBrokerId")
-              partition.appendRecordsToLeader(records, isFromClient, 
requiredAcks)
-
-            case None => throw new UnknownTopicOrPartitionException("Partition 
%s doesn't exist on %d"
-              .format(topicPartition, localBrokerId))
-          }
-
+          val (partition, _) = 
getPartitionAndLeaderReplicaIfLocal(topicPartition)
+          val info = partition.appendRecordsToLeader(records, isFromClient, 
requiredAcks)
           val numAppendedMessages = info.numMessages
 
           // update stats for successfully appended bytes and messages as 
bytesInRate and messageInRate
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index f2b3552..03137e1 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -170,6 +170,27 @@ class FetchRequestTest extends BaseRequestTest {
     assertEquals(0, records(partitionData).map(_.sizeInBytes).sum)
   }
 
+  @Test
+  def testFetchRequestToNonReplica(): Unit = {
+    val topic = "topic"
+    val partition = 0
+    val topicPartition = new TopicPartition(topic, partition)
+
+    // Create a single-partition topic and find a broker which is not the 
leader
+    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, 1, servers)
+    val leader = partitionToLeader(partition)
+    val nonReplicaOpt = servers.find(_.config.brokerId != leader)
+    assertTrue(nonReplicaOpt.isDefined)
+    val nonReplicaId =  nonReplicaOpt.get.config.brokerId
+
+    // Send the fetch request to the non-replica and verify the error code
+    val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, 
createPartitionMap(1024,
+      Seq(topicPartition))).build()
+    val fetchResponse = sendFetchRequest(nonReplicaId, fetchRequest)
+    val partitionData = fetchResponse.responseData.get(topicPartition)
+    assertEquals(Errors.NOT_LEADER_FOR_PARTITION, partitionData.error)
+  }
+
   /**
    * Tests that down-conversions dont leak memory. Large down conversions are 
triggered
    * in the server. The client closes its connection after reading partial 
data when the
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 382364f..96f74a0 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -20,6 +20,7 @@ package kafka.server
 import java.lang.{Long => JLong}
 import java.net.InetAddress
 import java.util
+import java.util.Collections
 
 import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0}
 import kafka.cluster.Replica
@@ -40,6 +41,7 @@ import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, 
EndPoint}
 import org.apache.kafka.common.requests.WriteTxnMarkersRequest.TxnMarkerEntry
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
@@ -106,6 +108,84 @@ class KafkaApisTest {
     )
   }
 
+  @Test
+  def testOffsetCommitWithInvalidPartition(): Unit = {
+    val topic = "topic"
+    setupBasicMetadataCache(topic, numPartitions = 1)
+
+    def checkInvalidPartition(invalidPartitionId: Int): Unit = {
+      EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+
+      val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
+      val partitionOffsetCommitData = new 
OffsetCommitRequest.PartitionData(15L, "")
+      val (offsetCommitRequest, request) = buildRequest(new 
OffsetCommitRequest.Builder("groupId",
+        Map(invalidTopicPartition -> partitionOffsetCommitData).asJava))
+
+      val capturedResponse = expectThrottleCallbackAndInvoke()
+      EasyMock.replay(replicaManager, clientRequestQuotaManager, 
requestChannel)
+      createKafkaApis().handleOffsetCommitRequest(request)
+
+      val response = readResponse(ApiKeys.OFFSET_COMMIT, offsetCommitRequest, 
capturedResponse)
+        .asInstanceOf[OffsetCommitResponse]
+      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response.responseData().get(invalidTopicPartition))
+    }
+
+    checkInvalidPartition(-1)
+    checkInvalidPartition(1) // topic has only one partition
+  }
+
+  @Test
+  def testTxnOffsetCommitWithInvalidPartition(): Unit = {
+    val topic = "topic"
+    setupBasicMetadataCache(topic, numPartitions = 1)
+
+    def checkInvalidPartition(invalidPartitionId: Int): Unit = {
+      EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+
+      val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
+      val partitionOffsetCommitData = new 
TxnOffsetCommitRequest.CommittedOffset(15L, "")
+      val (offsetCommitRequest, request) = buildRequest(new 
TxnOffsetCommitRequest.Builder("txnlId", "groupId",
+        15L, 0.toShort, Map(invalidTopicPartition -> 
partitionOffsetCommitData).asJava))
+
+      val capturedResponse = expectThrottleCallbackAndInvoke()
+      EasyMock.replay(replicaManager, clientRequestQuotaManager, 
requestChannel)
+      createKafkaApis().handleTxnOffsetCommitRequest(request)
+
+      val response = readResponse(ApiKeys.TXN_OFFSET_COMMIT, 
offsetCommitRequest, capturedResponse)
+        .asInstanceOf[TxnOffsetCommitResponse]
+      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response.errors().get(invalidTopicPartition))
+    }
+
+    checkInvalidPartition(-1)
+    checkInvalidPartition(1) // topic has only one partition
+  }
+
+  @Test
+  def testAddPartitionsToTxnWithInvalidPartition(): Unit = {
+    val topic = "topic"
+    setupBasicMetadataCache(topic, numPartitions = 1)
+
+    def checkInvalidPartition(invalidPartitionId: Int): Unit = {
+      EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+
+      val invalidTopicPartition = new TopicPartition(topic, invalidPartitionId)
+
+      val (addPartitionsToTxnRequest, request) = buildRequest(new 
AddPartitionsToTxnRequest.Builder(
+        "txnlId", 15L, 0.toShort, List(invalidTopicPartition).asJava))
+
+      val capturedResponse = expectThrottleCallbackAndInvoke()
+      EasyMock.replay(replicaManager, clientRequestQuotaManager, 
requestChannel)
+      createKafkaApis().handleAddPartitionToTxnRequest(request)
+
+      val response = readResponse(ApiKeys.ADD_PARTITIONS_TO_TXN, 
addPartitionsToTxnRequest, capturedResponse)
+        .asInstanceOf[AddPartitionsToTxnResponse]
+      assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
response.errors().get(invalidTopicPartition))
+    }
+
+    checkInvalidPartition(-1)
+    checkInvalidPartition(1) // topic has only one partition
+  }
+
   @Test(expected = classOf[UnsupportedVersionException])
   def 
shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported():
 Unit = {
     createKafkaApis(KAFKA_0_10_2_IV0).handleAddOffsetsToTxnRequest(null)
@@ -284,8 +364,6 @@ class KafkaApisTest {
     val timestamp: JLong = time.milliseconds()
     val limitOffset = 15L
 
-    val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
-    val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
     val replica = EasyMock.mock(classOf[Replica])
     val log = EasyMock.mock(classOf[Log])
     
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
@@ -295,8 +373,7 @@ class KafkaApisTest {
       
EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset
 = limitOffset))
     EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log))
     
EasyMock.expect(log.fetchOffsetsByTimestamp(timestamp)).andReturn(Some(TimestampOffset(timestamp
 = timestamp, offset = limitOffset)))
-    expectThrottleCallbackAndInvoke(capturedThrottleCallback)
-    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    val capturedResponse = expectThrottleCallbackAndInvoke()
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
replica, log)
 
     val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
@@ -327,8 +404,6 @@ class KafkaApisTest {
     val tp = new TopicPartition("foo", 0)
     val limitOffset = 15L
 
-    val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
-    val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
     val replica = EasyMock.mock(classOf[Replica])
     val log = EasyMock.mock(classOf[Log])
     
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
@@ -339,8 +414,7 @@ class KafkaApisTest {
     EasyMock.expect(replicaManager.getLog(tp)).andReturn(Some(log))
     
EasyMock.expect(log.fetchOffsetsByTimestamp(ListOffsetRequest.EARLIEST_TIMESTAMP))
       .andReturn(Some(TimestampOffset(timestamp = 
ListOffsetResponse.UNKNOWN_TIMESTAMP, offset = limitOffset)))
-    expectThrottleCallbackAndInvoke(capturedThrottleCallback)
-    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    val capturedResponse = expectThrottleCallbackAndInvoke()
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
replica, log)
 
     val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
@@ -393,14 +467,12 @@ class KafkaApisTest {
    * Return pair of listener names in the metadataCache: PLAINTEXT and 
LISTENER2 respectively.
    */
   private def updateMetadataCacheWithInconsistentListeners(): (ListenerName, 
ListenerName) = {
-    import UpdateMetadataRequest.{Broker => UBroker}
-    import UpdateMetadataRequest.{EndPoint => UEndPoint}
     val plaintextListener = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
     val anotherListener = new ListenerName("LISTENER2")
     val brokers = Set(
-      new UBroker(0, Seq(new UEndPoint("broker0", 9092, 
SecurityProtocol.PLAINTEXT, plaintextListener),
-        new UEndPoint("broker0", 9093, SecurityProtocol.PLAINTEXT, 
anotherListener)).asJava, "rack"),
-      new UBroker(1, Seq(new UEndPoint("broker1", 9092, 
SecurityProtocol.PLAINTEXT, plaintextListener)).asJava,
+      new Broker(0, Seq(new EndPoint("broker0", 9092, 
SecurityProtocol.PLAINTEXT, plaintextListener),
+        new EndPoint("broker0", 9093, SecurityProtocol.PLAINTEXT, 
anotherListener)).asJava, "rack"),
+      new Broker(1, Seq(new EndPoint("broker1", 9092, 
SecurityProtocol.PLAINTEXT, plaintextListener)).asJava,
         "rack")
     )
     val updateMetadataRequest = new 
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
@@ -410,10 +482,7 @@ class KafkaApisTest {
   }
 
   private def sendMetadataRequestWithInconsistentListeners(requestListener: 
ListenerName): MetadataResponse = {
-    val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
-    val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
-    expectThrottleCallbackAndInvoke(capturedThrottleCallback)
-    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    val capturedResponse = expectThrottleCallbackAndInvoke()
     EasyMock.replay(clientRequestQuotaManager, requestChannel)
 
     val (metadataRequest, requestChannelRequest) = 
buildRequest(MetadataRequest.Builder.allTopics, requestListener)
@@ -426,8 +495,6 @@ class KafkaApisTest {
     val tp = new TopicPartition("foo", 0)
     val latestOffset = 15L
 
-    val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
-    val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
     val replica = EasyMock.mock(classOf[Replica])
     val log = EasyMock.mock(classOf[Log])
     
EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(tp)).andReturn(replica)
@@ -435,8 +502,8 @@ class KafkaApisTest {
       
EasyMock.expect(replica.highWatermark).andReturn(LogOffsetMetadata(messageOffset
 = latestOffset))
     else
       
EasyMock.expect(replica.lastStableOffset).andReturn(LogOffsetMetadata(messageOffset
 = latestOffset))
-    expectThrottleCallbackAndInvoke(capturedThrottleCallback)
-    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+
+    val capturedResponse = expectThrottleCallbackAndInvoke()
     EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, 
replica, log)
 
     val builder = ListOffsetRequest.Builder.forConsumer(true, isolationLevel)
@@ -484,7 +551,8 @@ class KafkaApisTest {
     AbstractResponse.parseResponse(api, struct)
   }
 
-  private def expectThrottleCallbackAndInvoke(capturedThrottleCallback: 
Capture[Int => Unit]): Unit = {
+  private def expectThrottleCallbackAndInvoke(): 
Capture[RequestChannel.Response] = {
+    val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
     EasyMock.expect(clientRequestQuotaManager.maybeRecordAndThrottle(
       EasyMock.anyObject[RequestChannel.Request](),
       EasyMock.capture(capturedThrottleCallback)))
@@ -494,6 +562,21 @@ class KafkaApisTest {
           callback(0)
         }
       })
+
+    val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
+    
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    capturedResponse
+  }
+
+  private def setupBasicMetadataCache(topic: String, numPartitions: Int = 1): 
Unit = {
+    val replicas = List(0.asInstanceOf[Integer]).asJava
+    val partitionState = new UpdateMetadataRequest.PartitionState(1, 0, 1, 
replicas, 0, replicas, Collections.emptyList())
+    val plaintextListener = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    val broker = new Broker(0, Seq(new EndPoint("broker0", 9092, 
SecurityProtocol.PLAINTEXT, plaintextListener)).asJava, "rack")
+    val partitions = (0 until numPartitions).map(new TopicPartition(topic, _) 
-> partitionState).toMap
+    val updateMetadataRequest = new 
UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
+      0, partitions.asJava, Set(broker).asJava).build()
+    metadataCache.updateCache(correlationId = 0, updateMetadataRequest)
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 6024499..4e66494 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -59,6 +59,29 @@ class ProduceRequestTest extends BaseRequestTest {
       new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, 
"value2".getBytes)), 1)
   }
 
+  @Test
+  def testProduceToNonReplica() {
+    val topic = "topic"
+    val partition = 0
+
+    // Create a single-partition topic and find a broker which is not the 
leader
+    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, 1, servers)
+    val leader = partitionToLeader(partition)
+    val nonReplicaOpt = servers.find(_.config.brokerId != leader)
+    assertTrue(nonReplicaOpt.isDefined)
+    val nonReplicaId =  nonReplicaOpt.get.config.brokerId
+
+    // Send the produce request to the non-replica
+    val records = MemoryRecords.withRecords(CompressionType.NONE, new 
SimpleRecord("key".getBytes, "value".getBytes))
+    val topicPartition = new TopicPartition("topic", partition)
+    val partitionRecords = Map(topicPartition -> records)
+    val produceRequest = ProduceRequest.Builder.forCurrentMagic(-1, 3000, 
partitionRecords.asJava).build()
+
+    val produceResponse = sendProduceRequest(nonReplicaId, produceRequest)
+    assertEquals(1, produceResponse.responses.size)
+    assertEquals(Errors.NOT_LEADER_FOR_PARTITION, 
produceResponse.responses.asScala.head._2.error)
+  }
+
   /* returns a pair of partition id and leader id */
   private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int) 
= {
     val partitionToLeader = TestUtils.createTopic(zkClient, topic, 3, 2, 
servers)

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to