This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 09b6b50 KAFKA-13080: Direct fetch snapshot request to kraft
controller (#11041)
09b6b50 is described below
commit 09b6b506043c43f8c49e55249e9106e801ad4de5
Author: José Armando García Sancio <[email protected]>
AuthorDate: Tue Jul 13 16:14:15 2021 -0700
KAFKA-13080: Direct fetch snapshot request to kraft controller (#11041)
Reviewers: Colin P. McCabe <[email protected]>
---
.../main/scala/kafka/server/ControllerApis.scala | 6 +++
.../unit/kafka/server/ControllerApisTest.scala | 55 +++++++++++++++++++++-
2 files changed, 60 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 2c6557f..0eddbb4 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -81,6 +81,7 @@ class ControllerApis(val requestChannel: RequestChannel,
try {
request.header.apiKey match {
case ApiKeys.FETCH => handleFetch(request)
+ case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
case ApiKeys.METADATA => handleMetadataRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request)
@@ -141,6 +142,11 @@ class ControllerApis(val requestChannel: RequestChannel,
handleRaftRequest(request, response => new
FetchResponse(response.asInstanceOf[FetchResponseData]))
}
+ def handleFetchSnapshot(request: RequestChannel.Request): Unit = {
+ authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+ handleRaftRequest(request, response => new
FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData]))
+ }
+
def handleMetadataRequest(request: RequestChannel.Request): Unit = {
val metadataRequest = request.body[MetadataRequest]
def createResponseCallback(requestThrottleMs: Int): MetadataResponse = {
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 89fbd05..110ae72 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -20,6 +20,7 @@ package kafka.server
import java.net.InetAddress
import java.util
import java.util.Properties
+import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import kafka.network.RequestChannel
import kafka.raft.RaftManager
@@ -50,8 +51,9 @@ import
org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig
import
org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse
=> OldAlterConfigsResourceResponse}
import
org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.protocol.ApiMessage
+import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.controller.Controller
@@ -150,6 +152,57 @@ class ControllerApisTest {
}
@Test
+ def testFetchSentToKRaft(): Unit = {
+ when(
+ raftManager.handleRequest(
+ any(classOf[RequestHeader]),
+ any(classOf[ApiMessage]),
+ any(classOf[Long])
+ )
+ ).thenReturn(
+ new CompletableFuture()
+ )
+
+ createControllerApis(None, new MockController.Builder().build())
+ .handleFetch(buildRequest(new FetchRequest(new FetchRequestData(), 12)))
+
+ verify(raftManager).handleRequest(
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )
+ }
+
+ @Test
+ def testUnauthorizedFetchSnapshot(): Unit = {
+ assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
+ Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
+ handleFetchSnapshot(buildRequest(new FetchSnapshotRequest(new
FetchSnapshotRequestData(), 0))))
+ }
+
+ @Test
+ def testFetchSnapshotSentToKRaft(): Unit = {
+ when(
+ raftManager.handleRequest(
+ any(classOf[RequestHeader]),
+ any(classOf[ApiMessage]),
+ any(classOf[Long])
+ )
+ ).thenReturn(
+ new CompletableFuture()
+ )
+
+ createControllerApis(None, new MockController.Builder().build())
+ .handleFetchSnapshot(buildRequest(new FetchSnapshotRequest(new
FetchSnapshotRequestData(), 0)))
+
+ verify(raftManager).handleRequest(
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any(),
+ ArgumentMatchers.any()
+ )
+ }
+
+ @Test
def testUnauthorizedVote(): Unit = {
assertThrows(classOf[ClusterAuthorizationException], () =>
createControllerApis(
Some(createDenyAllAuthorizer()), new MockController.Builder().build()).