This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 09b6b50  KAFKA-13080: Direct fetch snapshot request to kraft 
controller (#11041)
09b6b50 is described below

commit 09b6b506043c43f8c49e55249e9106e801ad4de5
Author: José Armando García Sancio <[email protected]>
AuthorDate: Tue Jul 13 16:14:15 2021 -0700

    KAFKA-13080: Direct fetch snapshot request to kraft controller (#11041)
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../main/scala/kafka/server/ControllerApis.scala   |  6 +++
 .../unit/kafka/server/ControllerApisTest.scala     | 55 +++++++++++++++++++++-
 2 files changed, 60 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 2c6557f..0eddbb4 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -81,6 +81,7 @@ class ControllerApis(val requestChannel: RequestChannel,
     try {
       request.header.apiKey match {
         case ApiKeys.FETCH => handleFetch(request)
+        case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
         case ApiKeys.METADATA => handleMetadataRequest(request)
         case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
         case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request)
@@ -141,6 +142,11 @@ class ControllerApis(val requestChannel: RequestChannel,
     handleRaftRequest(request, response => new 
FetchResponse(response.asInstanceOf[FetchResponseData]))
   }
 
+  def handleFetchSnapshot(request: RequestChannel.Request): Unit = {
+    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+    handleRaftRequest(request, response => new 
FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData]))
+  }
+
   def handleMetadataRequest(request: RequestChannel.Request): Unit = {
     val metadataRequest = request.body[MetadataRequest]
     def createResponseCallback(requestThrottleMs: Int): MetadataResponse = {
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 89fbd05..110ae72 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -20,6 +20,7 @@ package kafka.server
 import java.net.InetAddress
 import java.util
 import java.util.Properties
+import java.util.concurrent.CompletableFuture
 import java.util.concurrent.ExecutionException
 import kafka.network.RequestChannel
 import kafka.raft.RaftManager
@@ -50,8 +51,9 @@ import 
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig
 import 
org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse
 => OldAlterConfigsResourceResponse}
 import 
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
 import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.ApiMessage
+import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.controller.Controller
@@ -150,6 +152,57 @@ class ControllerApisTest {
   }
 
   @Test
+  def testFetchSentToKRaft(): Unit = {
+    when(
+      raftManager.handleRequest(
+        any(classOf[RequestHeader]),
+        any(classOf[ApiMessage]),
+        any(classOf[Long])
+      )
+    ).thenReturn(
+      new CompletableFuture()
+    )
+
+    createControllerApis(None, new MockController.Builder().build())
+      .handleFetch(buildRequest(new FetchRequest(new FetchRequestData(), 12)))
+
+    verify(raftManager).handleRequest(
+      ArgumentMatchers.any(),
+      ArgumentMatchers.any(),
+      ArgumentMatchers.any()
+    )
+  }
+
+  @Test
+  def testUnauthorizedFetchSnapshot(): Unit = {
+    assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
+      Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
+        handleFetchSnapshot(buildRequest(new FetchSnapshotRequest(new 
FetchSnapshotRequestData(), 0))))
+  }
+
+  @Test
+  def testFetchSnapshotSentToKRaft(): Unit = {
+    when(
+      raftManager.handleRequest(
+        any(classOf[RequestHeader]),
+        any(classOf[ApiMessage]),
+        any(classOf[Long])
+      )
+    ).thenReturn(
+      new CompletableFuture()
+    )
+
+    createControllerApis(None, new MockController.Builder().build())
+      .handleFetchSnapshot(buildRequest(new FetchSnapshotRequest(new 
FetchSnapshotRequestData(), 0)))
+
+    verify(raftManager).handleRequest(
+      ArgumentMatchers.any(),
+      ArgumentMatchers.any(),
+      ArgumentMatchers.any()
+    )
+  }
+
+  @Test
   def testUnauthorizedVote(): Unit = {
     assertThrows(classOf[ClusterAuthorizationException], () => 
createControllerApis(
       Some(createDenyAllAuthorizer()), new MockController.Builder().build()).

Reply via email to