This is an automated email from the ASF dual-hosted git repository.

saranyak pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a7458422 CASSSIDECAR-357: Add cache for Authorization layer (#270)
a7458422 is described below

commit a74584223105677f4514fa744989592689943b18
Author: Saranya Krishnakumar <[email protected]>
AuthorDate: Tue Oct 21 21:08:02 2025 -0700

    CASSSIDECAR-357: Add cache for Authorization layer (#270)
    
    Patch by Saranya Krishnakumar; reviewed by Francisco Guerrero, Yifan Cai 
for CASSSIDECAR-357
---
 CHANGES.txt                                        |   1 +
 gradle.properties                                  |   1 +
 integration-tests/build.gradle                     |   1 +
 .../RoleBasedAuthorizationIntegrationTest.java     | 243 +++++++-
 server/build.gradle                                |   2 +-
 .../acl/authorization/AuthorizationCacheKey.java   |  44 ++
 .../authorization/AuthorizationCacheKeyImpl.java   |  93 ++++
 .../AuthorizationWithAdminBypassHandler.java       |  67 ---
 .../authorization/CachedAuthorizationHandler.java  | 207 +++++++
 .../yaml/AccessControlConfigurationImpl.java       |   9 +-
 .../sidecar/metrics/server/AuthMetrics.java        |  15 +-
 .../sidecar/metrics/server/CacheMetrics.java       |   2 +
 .../cassandra/sidecar/modules/AuthModule.java      |   9 +-
 .../cassandra/sidecar/routes/RouteBuilder.java     |  53 +-
 .../cassandra/sidecar/utils/CacheFactory.java      |  62 ++-
 .../AuthorizationCacheKeyImplTest.java             | 280 ++++++++++
 .../CachedAuthorizationHandlerTest.java            | 610 +++++++++++++++++++++
 .../sidecar/handlers/RouteBuilderTest.java         |  43 +-
 .../cassandra/sidecar/utils/CacheFactoryTest.java  | 155 +++++-
 19 files changed, 1766 insertions(+), 131 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index acf5dbf6..14e7fcb0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.3.0
 -----
+ * Add cache for Authorization layer (CASSSIDECAR-357)
  * Avoid creating objects in the CassandraAdapter implementation 
(CASSSIDECAR-355)
  * Code refactoring for some configuration classes and migrate 
RoleBasedAuthorizationIntegrationTest to integration-tests (CASSSIDECAR-351)
  * Fix restore job metrics and log adjustments (CASSSIDECAR-350)
diff --git a/gradle.properties b/gradle.properties
index 3beb14c5..822398d3 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -21,6 +21,7 @@ junitVersion=5.9.2
 vertxVersion=4.5.13
 nettyVersion=4.1.118.Final
 guavaVersion=27.0.1-jre
+caffeineVersion=2.9.3
 guiceVersion=7.0.0
 slf4jVersion=2.0.17
 logbackVersion=1.5.15
diff --git a/integration-tests/build.gradle b/integration-tests/build.gradle
index 1e093c6d..07d1c8e7 100644
--- a/integration-tests/build.gradle
+++ b/integration-tests/build.gradle
@@ -66,6 +66,7 @@ dependencies {
         exclude group: 'junit', module: 'junit'
     }
     
integrationTestImplementation("io.vertx:vertx-dropwizard-metrics:${project.vertxVersion}")
+    
integrationTestImplementation("com.github.ben-manes.caffeine:caffeine:${project.caffeineVersion}")
     
integrationTestImplementation("io.vertx:vertx-web-client:${project.vertxVersion}")
 
     integrationTestImplementation(group: 'net.java.dev.jna', name: 'jna', 
version: '5.9.0')
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/acl/RoleBasedAuthorizationIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/acl/authorization/RoleBasedAuthorizationIntegrationTest.java
similarity index 83%
rename from 
integration-tests/src/integrationTest/org/apache/cassandra/sidecar/acl/RoleBasedAuthorizationIntegrationTest.java
rename to 
integration-tests/src/integrationTest/org/apache/cassandra/sidecar/acl/authorization/RoleBasedAuthorizationIntegrationTest.java
index 7e51a16e..b0b1f99c 100644
--- 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/acl/RoleBasedAuthorizationIntegrationTest.java
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/acl/authorization/RoleBasedAuthorizationIntegrationTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.acl;
+package org.apache.cassandra.sidecar.acl.authorization;
 
 import java.net.InetSocketAddress;
 import java.nio.file.Path;
@@ -37,11 +37,14 @@ import org.slf4j.LoggerFactory;
 import com.datastax.driver.core.SSLOptions;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import com.google.inject.multibindings.ProvidesIntoMap;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.ext.web.client.HttpResponse;
@@ -82,6 +85,7 @@ import org.apache.cassandra.sidecar.testing.QualifiedName;
 import 
org.apache.cassandra.sidecar.testing.SharedClusterSidecarIntegrationTestBase;
 import org.apache.cassandra.sidecar.testing.SharedExecutorNettyOptions;
 import org.apache.cassandra.sidecar.testing.TemporaryCqlSessionProvider;
+import org.apache.cassandra.sidecar.utils.CacheFactory;
 import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 
@@ -91,19 +95,18 @@ import static 
org.apache.cassandra.testing.TestUtils.DC1_RF1;
 import static org.apache.cassandra.testing.TlsTestUtils.getSSLOptions;
 import static 
org.apache.cassandra.testing.TlsTestUtils.withAuthenticatedSession;
 import static org.apache.cassandra.testing.utils.AssertionUtils.getBlocking;
-import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
 /**
  * Test for role based access control in Sidecar
  * Note:
- * - Do not add new test cases in this class. Add them into test method, 
example refer to testForAdmin.
  * - Create a new keyspace or test role for each test method as required to 
prevent permissions overlapping
  */
 class RoleBasedAuthorizationIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
 {
     protected static final int MIN_VERSION_WITH_MTLS = 5;
+
     private static final String ADMIN_IDENTITY = 
"spiffe://cassandra/sidecar/admin";
     // CASSANDRA_IDENTITY is only used to configure schemas for test setup, do 
not use this identity for anything else
     private static final String CASSANDRA_IDENTITY = 
"spiffe://cassandra/sidecar/cassandra_role";
@@ -121,6 +124,12 @@ class RoleBasedAuthorizationIntegrationTest extends 
SharedClusterSidecarIntegrat
     public static final RoleWithIdentityTestScenario 
NON_ADMIN_READ_TEST_KEYSPACE_ROLE =
     new RoleWithIdentityTestScenario("non_admin_test_keyspace", 
"non_admin_test_role", "spiffe://cassandra/sidecar/non_admin_test_user")
     .addPermission("data/non_admin_test_keyspace", "SCHEMA:READ");
+    public static final RoleWithIdentityTestScenario 
NON_ADMIN_CACHE_REVOCATION_TEST_KEYSPACE_ROLE =
+    new 
RoleWithIdentityTestScenario("non_admin_cache_revocation_test_keyspace", 
"non_admin_test_role", "spiffe://cassandra/sidecar/non_admin_test_user")
+    .addPermission("data/non_admin_cache_revocation_test_keyspace", 
"SCHEMA:READ");
+    public static final RoleWithIdentityTestScenario 
NON_ADMIN_CACHE_FORBIDDEN_TEST_KEYSPACE_ROLE =
+    new 
RoleWithIdentityTestScenario("non_admin_cache_forbidden_test_keyspace", 
"non_admin_test_role", "spiffe://cassandra/sidecar/non_admin_test_user")
+    .addPermission("data/non_admin_cache_forbidden_test_keyspace", 
"GOSSIP:READ");
     public static final RoleWithIdentityTestScenario 
NON_ADMIN_CREATE_SNAPSHOT_TEST_KEYSPACE_ROLE =
     new RoleWithIdentityTestScenario("grant_table_test_keyspace", 
"non_admin_test_role", "spiffe://cassandra/sidecar/non_admin_test_user")
     .addPermission("data/grant_table_test_keyspace/test_table", 
"SNAPSHOT:CREATE");
@@ -171,6 +180,8 @@ class RoleBasedAuthorizationIntegrationTest extends 
SharedClusterSidecarIntegrat
     static final List<RoleWithIdentityTestScenario> 
ROLE_WITH_IDENTITY_TEST_SCENARIOS = List.of(SUPERUSER,
                                                                                
                 NON_SUPERUSER_ROLE_WITH_TRANSITIVE_SUPERUSER_ROLE,
                                                                                
                 NON_ADMIN_READ_TEST_KEYSPACE_ROLE,
+                                                                               
                 NON_ADMIN_CACHE_REVOCATION_TEST_KEYSPACE_ROLE,
+                                                                               
                 NON_ADMIN_CACHE_FORBIDDEN_TEST_KEYSPACE_ROLE,
                                                                                
                 NON_ADMIN_CREATE_SNAPSHOT_TEST_KEYSPACE_ROLE,
                                                                                
                 NON_ADMIN_CREATE_SNAPSHOT_KEYSPACE_LEVEL_ROLE,
                                                                                
                 NON_ADMIN_CREATE_SNAPSHOT_ALL_TABLES_ROLE,
@@ -463,21 +474,20 @@ class RoleBasedAuthorizationIntegrationTest extends 
SharedClusterSidecarIntegrat
                                     
"data/multiple_permissions_required_test_keyspace/test_table",
                                     "SNAPSHOT:READ");
 
-            // wait for cache refresh
-            loopAssert(2, 100, () -> {
-                verifyAccess(HttpMethod.GET, listSnapshotRoute, 
nonAdminClientKeystorePath, response -> {
-                    assertThat(response).isNotNull();
-                    
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
-                    ListSnapshotFilesResponse snapshotFiles = 
response.bodyAsJson(ListSnapshotFilesResponse.class);
-                    List<ListSnapshotFilesResponse.FileInfo> filesToStream =
-                    snapshotFiles.snapshotFilesInfo()
-                                 .stream()
-                                 .filter(info -> 
info.fileName.endsWith("-Data.db"))
-                                 .sorted(Comparator.comparing(o -> o.fileName))
-                                 .collect(Collectors.toList());
-                    assertThat(filesToStream).isNotNull().isNotEmpty();
-                    componentDownloadUrl[0] = 
filesToStream.get(0).componentDownloadUrl();
-                });
+            invalidateAuthorizationHandlerCaches();
+
+            verifyAccess(HttpMethod.GET, listSnapshotRoute, 
nonAdminClientKeystorePath, response -> {
+                assertThat(response).isNotNull();
+                
assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
+                ListSnapshotFilesResponse snapshotFiles = 
response.bodyAsJson(ListSnapshotFilesResponse.class);
+                List<ListSnapshotFilesResponse.FileInfo> filesToStream =
+                snapshotFiles.snapshotFilesInfo()
+                             .stream()
+                             .filter(info -> 
info.fileName.endsWith("-Data.db"))
+                             .sorted(Comparator.comparing(o -> o.fileName))
+                             .collect(Collectors.toList());
+                assertThat(filesToStream).isNotNull().isNotEmpty();
+                componentDownloadUrl[0] = 
filesToStream.get(0).componentDownloadUrl();
             });
 
             // grant sidecar permission for streaming
@@ -486,21 +496,22 @@ class RoleBasedAuthorizationIntegrationTest extends 
SharedClusterSidecarIntegrat
                                     
"data/multiple_permissions_required_test_keyspace/test_table",
                                     "SNAPSHOT:STREAM");
 
+            invalidateAuthorizationHandlerCaches();
+
             // STREAM SSTable request requires both Sidecar SNAPSHOT:STREAM 
permission and Cassandra's SELECT
             // permission on a table it accesses data.
-            loopAssert(2, 100, () -> {
-                // request denied without SELECT permission
-                verifyAccess(HttpMethod.GET, componentDownloadUrl[0], 
nonAdminClientKeystorePath, assertStatus(HttpResponseStatus.FORBIDDEN));
-            });
+
+            // request denied without SELECT permission
+            verifyAccess(HttpMethod.GET, componentDownloadUrl[0], 
nonAdminClientKeystorePath, assertStatus(HttpResponseStatus.FORBIDDEN));
 
             // grant SELECT permission to non_admin_test_role
             grantTablePermission(session, 
"multiple_permissions_required_test_keyspace", "test_table", 
"non_admin_test_role");
         }, sslOptions);
 
-        loopAssert(2, 100, () -> {
-            // request denied without SELECT permission
-            verifyAccess(HttpMethod.GET, componentDownloadUrl[0], 
nonAdminClientKeystorePath, assertStatus(HttpResponseStatus.OK));
-        });
+        invalidateAuthorizationHandlerCaches();
+
+        // request goes through with SELECT permission
+        verifyAccess(HttpMethod.GET, componentDownloadUrl[0], 
nonAdminClientKeystorePath, assertStatus(HttpResponseStatus.OK));
     }
 
     @Test
@@ -656,6 +667,157 @@ class RoleBasedAuthorizationIntegrationTest extends 
SharedClusterSidecarIntegrat
                                                
HttpResponseStatus.INTERNAL_SERVER_ERROR.code());
     }
 
+    @Test
+    void testAuthorizationCaching()
+    {
+        SidecarMetrics metrics = 
serverWrapper.injector.getInstance(SidecarMetrics.class);
+
+        CacheStats baseline = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+
+        String keyspaceSchemaRoute = 
String.format("/api/v1/keyspaces/%s/schema", "non_admin_test_keyspace");
+
+        WebClient client = 
trustedClient(nonAdminClientKeystorePath.toString(), 
mtlsTestHelper.clientKeyStorePassword(),
+                                         mtlsTestHelper.trustStorePath(), 
mtlsTestHelper.trustStorePassword());
+        try
+        {
+            createMultipleRequests(client, HttpMethod.GET, 
keyspaceSchemaRoute, 2, HttpResponseStatus.OK.code());
+        }
+        finally
+        {
+            client.close();
+        }
+
+        // Verify cache stats, 1 hit 1 miss
+        CacheStats callStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(callStats.missCount()).isEqualTo(1);
+        assertThat(callStats.hitCount()).isEqualTo(1);
+    }
+
+    @Test
+    void testAuthorizationCachingWithPermissionRevocation()
+    {
+        SidecarMetrics metrics = 
serverWrapper.injector.getInstance(SidecarMetrics.class);
+
+        CacheStats baseline = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+
+        String keyspaceSchemaRoute = 
String.format("/api/v1/keyspaces/%s/schema", 
"non_admin_cache_revocation_test_keyspace");
+
+        WebClient client = 
trustedClient(nonAdminClientKeystorePath.toString(), 
mtlsTestHelper.clientKeyStorePassword(),
+                                         mtlsTestHelper.trustStorePath(), 
mtlsTestHelper.trustStorePassword());
+
+        createMultipleRequests(client, HttpMethod.GET, keyspaceSchemaRoute, 2, 
HttpResponseStatus.OK.code());
+
+        CacheStats callStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(callStats.missCount()).isEqualTo(1);
+        assertThat(callStats.hitCount()).isEqualTo(1);
+
+        // Revoke permission
+        Path clientKeystorePath = cassandraIdentityClientKeyStore();
+        SSLOptions sslOptions = getSSLOptions(clientKeystorePath.toString(),
+                                              
mtlsTestHelper.clientKeyStorePassword(),
+                                              mtlsTestHelper.trustStorePath(),
+                                              
mtlsTestHelper.trustStorePassword());
+        withAuthenticatedSession(cluster.get(1), "cassandra", "cassandra", 
session -> {
+            session.execute(String.format("DELETE FROM 
sidecar_internal.role_permissions_v1 " +
+                                          "WHERE role = '%s' AND resource = 
'data/%s'", "non_admin_test_role",
+                                          
"non_admin_cache_revocation_test_keyspace"));
+        }, sslOptions);
+
+        invalidateAuthorizationHandlerCaches();
+
+        try
+        {
+            // After cache expires, verify permission revocation takes effect
+            createMultipleRequests(client, HttpMethod.GET, 
keyspaceSchemaRoute, 2, HttpResponseStatus.FORBIDDEN.code());
+        }
+        finally
+        {
+            client.close();
+        }
+
+        CacheStats finalCallStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        // After cache expires, we should see a new miss and a hit for 
subsequent call
+        assertThat(finalCallStats.missCount()).isEqualTo(1);
+        assertThat(finalCallStats.hitCount()).isEqualTo(1);
+    }
+
+    @Test
+    void testAuthorizationCachingForForbiddenRequests()
+    {
+        SidecarMetrics metrics = 
serverWrapper.injector.getInstance(SidecarMetrics.class);
+
+        CacheStats baseline = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+
+        String keyspaceSchemaRoute = 
String.format("/api/v1/keyspaces/%s/schema", 
"non_admin_cache_forbidden_test_keyspace");
+
+        // user has GOSSIP:READ permission but not SCHEMA:READ
+        WebClient client = 
trustedClient(nonAdminClientKeystorePath.toString(), 
mtlsTestHelper.clientKeyStorePassword(),
+                                         mtlsTestHelper.trustStorePath(), 
mtlsTestHelper.trustStorePassword());
+
+        try
+        {
+            createMultipleRequests(client, HttpMethod.GET, 
keyspaceSchemaRoute, 2, HttpResponseStatus.FORBIDDEN.code());
+        }
+        finally
+        {
+            client.close();
+        }
+
+        // Verify cache stats, 1 hit 1 miss
+        CacheStats callStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(callStats.missCount()).isEqualTo(1);
+        assertThat(callStats.hitCount()).isEqualTo(1);
+    }
+
+    @Test
+    void testSameUserAccessingDifferentRoutes() throws Exception
+    {
+        SidecarMetrics metrics = 
serverWrapper.injector.getInstance(SidecarMetrics.class);
+
+        CacheStats baseline = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+
+        String keyspaceSchemaRoute = 
String.format("/api/v1/keyspaces/%s/schema", "non_admin_test_keyspace");
+        String createSnapshotRoute = 
String.format("/api/v1/keyspaces/%s/tables/%s/snapshots/my-snapshot-different-access",
+                                                   
"grant_table_test_keyspace", "test_table");
+
+        verifyAccess(HttpMethod.GET, keyspaceSchemaRoute, 
nonAdminClientKeystorePath, assertStatus(HttpResponseStatus.OK));
+        verifyAccess(HttpMethod.PUT, createSnapshotRoute, 
nonAdminClientKeystorePath, assertStatus(HttpResponseStatus.OK));
+
+        // Verify cache stats, both miss
+        CacheStats callStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(callStats.missCount()).isEqualTo(2);
+        assertThat(callStats.hitCount()).isEqualTo(0);
+    }
+
+    @Test
+    void testAdminBypassCaching() throws Exception
+    {
+        SidecarMetrics metrics = 
serverWrapper.injector.getInstance(SidecarMetrics.class);
+
+        CacheStats baseline = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+
+        String keyspaceSchemaRoute = 
String.format("/api/v1/keyspaces/%s/schema", "non_admin_test_keyspace");
+        // Uses client keystore with admin identity. Configured admin 
identities bypass authorization checks
+        Path clientKeystorePath = 
mtlsTestHelper.issueClientKeyStore(certificateBuilder ->
+                                                                     
certificateBuilder.addSanUriName(ADMIN_IDENTITY));
+
+        WebClient client = trustedClient(clientKeystorePath.toString(), 
mtlsTestHelper.clientKeyStorePassword(),
+                                         mtlsTestHelper.trustStorePath(), 
mtlsTestHelper.trustStorePassword());
+        try
+        {
+            createMultipleRequests(client, HttpMethod.GET, 
keyspaceSchemaRoute, 2, HttpResponseStatus.OK.code());
+        }
+        finally
+        {
+            client.close();
+        }
+
+        // Verify cache stats, 1 hit 1 miss
+        CacheStats callStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(callStats.missCount()).isEqualTo(1);
+        assertThat(callStats.hitCount()).isEqualTo(1);
+    }
+
     @Override
     protected void initializeSchemaForTest()
     {
@@ -744,6 +906,13 @@ class RoleBasedAuthorizationIntegrationTest extends 
SharedClusterSidecarIntegrat
                                       "where role = '%s' and resource = '%s'", 
permission, role, resource));
     }
 
+    private void invalidateAuthorizationHandlerCaches()
+    {
+        CacheFactory factory = 
serverWrapper.injector.getInstance(CacheFactory.class);
+        AsyncCache<AuthorizationCacheKey, Boolean> authorizationCache = 
factory.endpointAuthorizationCache();
+        authorizationCache.synchronous().invalidateAll();
+    }
+
     private void verifyAccess(HttpMethod method, String testRoute, Path 
clientKeystorePath, Verifier<HttpResponse<Buffer>> assertions)
     {
         verifyAccess(method, testRoute, clientKeystorePath.toString(), 
assertions);
@@ -769,6 +938,28 @@ class RoleBasedAuthorizationIntegrationTest extends 
SharedClusterSidecarIntegrat
         return getBlocking(client.request(method, serverWrapper.serverPort, 
"127.0.0.1", route).send());
     }
 
+    private void createMultipleRequests(WebClient client, HttpMethod method, 
String route, int times,
+                                        int expectedResponseCode)
+    {
+        List<Future<HttpResponse<Buffer>>> futures = new ArrayList<>();
+        for (int i = 0; i < times; i++)
+        {
+            futures.add(createNonBlockingRequest(client, method, route));
+        }
+
+        // Now block for response
+        for (int i = 0; i < times; i++)
+        {
+            HttpResponse<Buffer> response = getBlocking(futures.get(i));
+            assertThat(response.statusCode()).isEqualTo(expectedResponseCode);
+        }
+    }
+
+    private Future<HttpResponse<Buffer>> createNonBlockingRequest(WebClient 
client, HttpMethod method, String route)
+    {
+        return client.request(method, serverWrapper.serverPort, "127.0.0.1", 
route).send();
+    }
+
     private void configureAdminAndSidecarIdentity(IInstance instance)
     {
         for (int i = 0; i < 60; i++)
diff --git a/server/build.gradle b/server/build.gradle
index 74f36bdb..483b517a 100644
--- a/server/build.gradle
+++ b/server/build.gradle
@@ -95,7 +95,7 @@ dependencies {
 
     implementation('com.datastax.cassandra:cassandra-driver-core:3.11.3')
     implementation("com.google.inject:guice:${guiceVersion}")
-    implementation("com.github.ben-manes.caffeine:caffeine:2.9.3")
+    implementation("com.github.ben-manes.caffeine:caffeine:${caffeineVersion}")
 
     implementation("org.slf4j:slf4j-api:${project.slf4jVersion}")
     implementation("ch.qos.logback:logback-core:${project.logbackVersion}")
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationCacheKey.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationCacheKey.java
new file mode 100644
index 00000000..5b70012c
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationCacheKey.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import io.vertx.ext.auth.authorization.AuthorizationContext;
+
+/**
+ * {@link AuthorizationCacheKey} is used to uniquely represent an 
authorization request using both user context
+ * and resource context.
+ */
+public interface AuthorizationCacheKey
+{
+    /**
+     * Creates an instance of {@link AuthorizationCacheKey} using Vert.x's 
{@link AuthorizationContext}.
+     * (Note: {@link AuthorizationCacheKey} is required because {@link 
AuthorizationContext} does not have equals
+     * comparison)
+     *
+     * @param handlerId            handlerId uniquely represents the 
authorization handler used for a route
+     * @param authorizationContext instance of {@link AuthorizationContext}. 
Used by Vert.x to represent an
+     *                             authorization request.
+     * @return {@link AuthorizationCacheKey} uniquely represents an 
authorization request using user and resource
+     * context.
+     */
+    static AuthorizationCacheKey create(int handlerId, AuthorizationContext 
authorizationContext)
+    {
+        return new AuthorizationCacheKeyImpl(handlerId, 
authorizationContext.user(), authorizationContext.variables());
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationCacheKeyImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationCacheKeyImpl.java
new file mode 100644
index 00000000..936fe5e0
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationCacheKeyImpl.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import io.vertx.ext.auth.User;
+
+import static 
org.apache.cassandra.sidecar.utils.AuthUtils.extractCassandraRoles;
+
+/**
+ * Implementation of {@link AuthorizationCacheKey}, uniquely represents an 
authorization request with user and resource
+ * context.
+ */
+public class AuthorizationCacheKeyImpl implements AuthorizationCacheKey
+{
+    private final int handlerId;
+    private final List<String> roles;
+    private final Set<String> variables;
+    private final int hashCode;
+
+    /**
+     * Creates an instance of {@link AuthorizationCacheKeyImpl}
+     *
+     * @param handlerId an id used to uniquely represent an authorization 
handler used for a route
+     * @param user      {@link User} represents an user in Vert.x
+     * @param variables resource mappings associated with a user request
+     */
+    public AuthorizationCacheKeyImpl(int handlerId, User user, 
Iterable<Map.Entry<String, String>> variables)
+    {
+        this.handlerId = handlerId;
+        this.roles = extractCassandraRoles(user);
+
+        if (variables == null || !variables.iterator().hasNext())
+        {
+            this.variables = Set.of();
+        }
+        else
+        {
+            // Vert.x HeadersMultimap and HeadersMultimap.MapEntry does not 
implement equals or hashCode,
+            // hence we store flattened variables in a Set
+            Set<String> flattenedVariables = new HashSet<>();
+            for (Map.Entry<String, String> entry : variables)
+            {
+                // We convert to lower case, since Vert.x Multimap 
representation is case insensitive for variables stored
+                flattenedVariables.add(entry.getKey().toLowerCase() + ":" + 
entry.getValue());
+            }
+            this.variables = flattenedVariables;
+        }
+        this.hashCode = Objects.hash(this.handlerId, this.roles, 
this.variables);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+        AuthorizationCacheKeyImpl that = (AuthorizationCacheKeyImpl) o;
+        return handlerId == that.handlerId && roles.equals(that.roles) && 
variables.equals(that.variables);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return hashCode;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationWithAdminBypassHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationWithAdminBypassHandler.java
deleted file mode 100644
index 226b1390..00000000
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationWithAdminBypassHandler.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.acl.authorization;
-
-import java.util.List;
-
-import io.vertx.ext.auth.authorization.Authorization;
-import io.vertx.ext.web.RoutingContext;
-import io.vertx.ext.web.handler.impl.AuthorizationHandlerImpl;
-import org.apache.cassandra.sidecar.acl.AdminIdentityResolver;
-
-import static org.apache.cassandra.sidecar.utils.AuthUtils.extractIdentities;
-
-/**
- * Verifies user has required authorizations. Allows admin identities to 
bypass authorization checks.
- */
-public class AuthorizationWithAdminBypassHandler extends 
AuthorizationHandlerImpl
-{
-    private final AuthorizationParameterValidateHandler 
authZParameterValidateHandler;
-    private final AdminIdentityResolver adminIdentityResolver;
-
-    public 
AuthorizationWithAdminBypassHandler(AuthorizationParameterValidateHandler 
authZParameterValidateHandler,
-                                               AdminIdentityResolver 
adminIdentityResolver,
-                                               Authorization authorization)
-    {
-        super(authorization);
-        this.authZParameterValidateHandler = authZParameterValidateHandler;
-        this.adminIdentityResolver = adminIdentityResolver;
-    }
-
-    @Override
-    public void handle(RoutingContext ctx)
-    {
-        authZParameterValidateHandler.handle(ctx);
-        if (ctx.failed()) // failed due to validation
-        {
-            return;
-        }
-
-        List<String> identities = extractIdentities(ctx.user());
-
-        // Admin identities bypass route specific authorization checks
-        if (!identities.isEmpty() && 
identities.stream().anyMatch(adminIdentityResolver::isAdmin))
-        {
-            ctx.next();
-            return;
-        }
-
-        super.handle(ctx);
-    }
-}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandler.java
new file mode 100644
index 00000000..b61ff280
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandler.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import io.vertx.core.Future;
+import io.vertx.ext.auth.User;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.AuthorizationContext;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.AuthorizationHandler;
+import io.vertx.ext.web.handler.HttpException;
+import io.vertx.ext.web.handler.impl.AuthorizationHandlerImpl;
+import org.apache.cassandra.sidecar.acl.AdminIdentityResolver;
+import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.metrics.server.AuthMetrics;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.vertx.core.Future.fromCompletionStage;
+import static org.apache.cassandra.sidecar.utils.AuthUtils.extractIdentities;
+
+/**
+ * {@link CachedAuthorizationHandler} caches all authorization requests using 
{@link AuthorizationCacheKey}.
+ */
+public class CachedAuthorizationHandler extends AuthorizationHandlerImpl
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CachedAuthorizationHandler.class);
+
+    // uniquely identities CachedAuthorizationHandler across different routes. 
Having same handlerId can lead
+    // to permission bypass across routes.
+    private static final AtomicInteger HANDLER_ID_GEN = new AtomicInteger(0);
+    private static final HttpException FORBIDDEN_EXCEPTION = new 
HttpException(403);
+    private final int handlerId;
+    private final AccessControlConfiguration accessControlConfiguration;
+    private final AuthorizationParameterValidateHandler 
authZParameterValidateHandler;
+    private final AdminIdentityResolver adminIdentityResolver;
+    private final AuthMetrics authMetrics;
+    private final AsyncCache<AuthorizationCacheKey, Boolean> 
authorizationCache;
+
+    // This is overridden since Vert.x does not expose this
+    private BiConsumer<RoutingContext, AuthorizationContext> variableHandler;
+
+    public CachedAuthorizationHandler(AccessControlConfiguration 
accessControlConfiguration,
+                                      AuthorizationParameterValidateHandler 
authZParameterValidateHandler,
+                                      AdminIdentityResolver 
adminIdentityResolver,
+                                      Authorization authorization,
+                                      SidecarMetrics sidecarMetrics,
+                                      AsyncCache<AuthorizationCacheKey, 
Boolean> authorizationCache)
+    {
+        this(HANDLER_ID_GEN.getAndIncrement(), accessControlConfiguration, 
authZParameterValidateHandler,
+             adminIdentityResolver, authorization, sidecarMetrics, 
authorizationCache);
+    }
+
+    @VisibleForTesting
+    public CachedAuthorizationHandler(int handlerId,
+                                      AccessControlConfiguration 
accessControlConfiguration,
+                                      AuthorizationParameterValidateHandler 
authZParameterValidateHandler,
+                                      AdminIdentityResolver 
adminIdentityResolver,
+                                      Authorization authorization,
+                                      SidecarMetrics sidecarMetrics,
+                                      AsyncCache<AuthorizationCacheKey, 
Boolean> authorizationCache)
+    {
+        super(authorization);
+        this.handlerId = handlerId;
+        this.accessControlConfiguration = accessControlConfiguration;
+        this.authZParameterValidateHandler = authZParameterValidateHandler;
+        this.adminIdentityResolver = adminIdentityResolver;
+        this.authMetrics = sidecarMetrics.server().auth();
+        this.authorizationCache = authorizationCache;
+    }
+
+    @Override
+    public void handle(RoutingContext ctx)
+    {
+        long startTimeNanos = System.nanoTime();
+        authZParameterValidateHandler.handle(ctx);
+        if (ctx.failed()) // failed due to validation
+        {
+            return;
+        }
+
+        AtomicBoolean ctxNextCalled = new AtomicBoolean(false);
+        Future<Boolean> authorizationFuture
+        = fromCompletionStage(checkAuthorization(ctx, ctxNextCalled, 
startTimeNanos));
+
+        authorizationFuture
+        .onSuccess(authorized -> {
+            // We avoid calling ctx.next() and ctx.fail() when it is already 
done during cache value computation
+            if (Boolean.TRUE.equals(authorized))
+            {
+                if (!ctxNextCalled.get())
+                {
+                    ctx.next();
+                }
+            }
+            else
+            {
+                if (!ctx.failed())
+                {
+                    ctx.fail(FORBIDDEN.code(), FORBIDDEN_EXCEPTION);
+                }
+            }
+        })
+        .onFailure(cause -> {
+            LOGGER.error("Error encountered during authorization cache 
computation", cause);
+            if (!ctx.failed())
+            {
+                ctx.fail(FORBIDDEN.code(), FORBIDDEN_EXCEPTION);
+            }
+        });
+    }
+
+    @Override
+    public AuthorizationHandler variableConsumer(BiConsumer<RoutingContext, 
AuthorizationContext> handler)
+    {
+        this.variableHandler = handler;
+        super.variableConsumer(handler);
+        return this;
+    }
+
+    private CompletableFuture<Boolean> checkAuthorization(RoutingContext ctx, 
AtomicBoolean ctxNextCalled,
+                                                          long startTimeNanos)
+    {
+        if 
(!this.accessControlConfiguration.permissionCacheConfiguration().enabled())
+        {
+            // We perform authorization checks everytime if caching is disabled
+            return CompletableFuture.completedFuture(isUserAuthorized(ctx, 
ctxNextCalled, startTimeNanos));
+        }
+
+        AuthorizationCacheKey key = createAuthorizationKey(ctx);
+        return authorizationCache.get(key, k -> isUserAuthorized(ctx, 
ctxNextCalled, startTimeNanos));
+    }
+
+    private AuthorizationCacheKey createAuthorizationKey(RoutingContext ctx)
+    {
+        User user = ctx.user();
+        AuthorizationContext authorizationContext = 
AuthorizationContext.create(user);
+        if (this.variableHandler != null)
+        {
+            this.variableHandler.accept(ctx, authorizationContext);
+        }
+        return AuthorizationCacheKey.create(handlerId, authorizationContext);
+    }
+
+    private boolean isUserAuthorized(RoutingContext ctx, AtomicBoolean 
ctxNextCalled, long startTimeNanos)
+    {
+        User user = ctx.user();
+        List<String> identities = extractIdentities(user);
+
+        // Admin identities bypass route specific authorization checks
+        if (isAdmin(identities))
+        {
+            return true;
+        }
+
+        super.handle(ctx);
+        if (!ctx.failed())
+        {
+            ctxNextCalled.set(true);
+            long durationNanos = System.nanoTime() - startTimeNanos;
+            // authorization time recorded here is only taking into account 
authorizations that are not cached
+            authMetrics.authorizationTime.metric.update(durationNanos, 
TimeUnit.NANOSECONDS);
+            return true;
+        }
+        return false;
+    }
+
+    private boolean isAdmin(List<String> identities)
+    {
+        for (String identity : identities)
+        {
+            if (adminIdentityResolver.isAdmin(identity))
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/AccessControlConfigurationImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/AccessControlConfigurationImpl.java
index 644e42eb..4069dca2 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/AccessControlConfigurationImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/AccessControlConfigurationImpl.java
@@ -40,10 +40,11 @@ public class AccessControlConfigurationImpl implements 
AccessControlConfiguratio
     private static final ParameterizedClassConfiguration 
DEFAULT_AUTHORIZER_CONFIGURATION
     = new 
ParameterizedClassConfigurationImpl(AllowAllAuthorizationProvider.class.getName(),
 Collections.emptyMap());
     private static final Set<String> DEFAULT_ADMIN_IDENTITIES = 
Collections.emptySet();
-    private static final CacheConfiguration 
DEFAULT_PERMISSION_CACHE_CONFIGURATION = CacheConfigurationImpl.builder()
-                                                                               
                            
.expireAfterAccess(MillisecondBoundConfiguration.parse("2h"))
-                                                                               
                            .maximumSize(1_000)
-                                                                               
                            .build();
+    private static final CacheConfiguration 
DEFAULT_PERMISSION_CACHE_CONFIGURATION
+    = CacheConfigurationImpl.builder()
+                            
.expireAfterAccess(MillisecondBoundConfiguration.parse("2h"))
+                            .maximumSize(1_000)
+                            .build();
 
     @JsonProperty(value = "enabled")
     protected final boolean enabled;
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/AuthMetrics.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/AuthMetrics.java
index 816fc536..b3333a04 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/AuthMetrics.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/AuthMetrics.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.sidecar.metrics.server;
 
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
 import org.apache.cassandra.sidecar.metrics.NamedMetric;
 
 import static 
org.apache.cassandra.sidecar.metrics.server.ServerMetrics.SERVER_PREFIX;
@@ -32,17 +33,27 @@ public class AuthMetrics
     private static final String DOMAIN = SERVER_PREFIX + ".Auth";
     public final NamedMetric<Counter> jwtPemRefreshFailures;
     public final NamedMetric<Counter> jwtPemRefreshSuccesses;
+    /**
+     * Captures the time to successfully authorize non-cached requests. Cached 
authorization requests will
+     * not be recorded in this metric
+     */
+    public final NamedMetric<Timer> authorizationTime;
 
     public AuthMetrics(MetricRegistry metricRegistry)
     {
-        jwtPemRefreshFailures = NamedMetric.builder(name -> 
metricRegistry.counter(name))
+        jwtPemRefreshFailures = NamedMetric.builder(metricRegistry::counter)
                                         .withDomain(DOMAIN)
                                         .withName("JwtPemRefreshFailures")
                                         .build();
 
-        jwtPemRefreshSuccesses = NamedMetric.builder(name -> 
metricRegistry.counter(name))
+        jwtPemRefreshSuccesses = NamedMetric.builder(metricRegistry::counter)
                                          .withDomain(DOMAIN)
                                          .withName("JwtPemRefreshSuccesses")
                                          .build();
+
+        authorizationTime = NamedMetric.builder(metricRegistry::timer)
+                                       .withDomain(DOMAIN)
+                                       .withName("authorizationTime")
+                                       .build();
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/CacheMetrics.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/CacheMetrics.java
index 3c478884..84a7da4d 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/CacheMetrics.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/metrics/server/CacheMetrics.java
@@ -32,6 +32,7 @@ public class CacheMetrics
     public final CacheStatsCounter identityToRoleCacheMetrics;
     public final CacheStatsCounter superUserCacheMetrics;
     public final CacheStatsCounter rolePermissionsCacheMetrics;
+    public final CacheStatsCounter authorizationCacheMetrics;
 
     public CacheMetrics(MetricRegistry globalMetricRegistry)
     {
@@ -39,5 +40,6 @@ public class CacheMetrics
         identityToRoleCacheMetrics = new 
CacheStatsCounter(globalMetricRegistry, "identity_to_role_cache");
         superUserCacheMetrics = new CacheStatsCounter(globalMetricRegistry, 
"super_user_cache");
         rolePermissionsCacheMetrics = new 
CacheStatsCounter(globalMetricRegistry, "role_permissions_cache");
+        authorizationCacheMetrics = new 
CacheStatsCounter(globalMetricRegistry, "authorization_cache");
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/AuthModule.java 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/AuthModule.java
index 5ffb5b57..dd27b4f3 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/modules/AuthModule.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/modules/AuthModule.java
@@ -58,6 +58,7 @@ import 
org.apache.cassandra.sidecar.modules.multibindings.VertxRouteMapKeys;
 import org.apache.cassandra.sidecar.routes.RouteBuilder;
 import org.apache.cassandra.sidecar.routes.RoutingOrder;
 import org.apache.cassandra.sidecar.routes.VertxRoute;
+import org.apache.cassandra.sidecar.utils.CacheFactory;
 
 /**
  * Provides authentication and authorization (role-based) capability
@@ -85,12 +86,16 @@ public class AuthModule extends AbstractModule
     RouteBuilder.Factory 
accessProtectedRouteBuilderFactory(SidecarConfiguration sidecarConfiguration,
                                                             
AuthorizationProvider authorizationProvider,
                                                             
AdminIdentityResolver adminIdentityResolver,
-                                                            
AuthorizationParameterValidateHandler authorizationParameterValidateHandler)
+                                                            
AuthorizationParameterValidateHandler authorizationParameterValidateHandler,
+                                                            SidecarMetrics 
metrics,
+                                                            CacheFactory 
cacheFactory)
     {
         return new 
RouteBuilder.Factory(sidecarConfiguration.accessControlConfiguration(),
                                         authorizationProvider,
                                         adminIdentityResolver,
-                                        authorizationParameterValidateHandler);
+                                        authorizationParameterValidateHandler,
+                                        metrics,
+                                        
cacheFactory.endpointAuthorizationCache());
     }
 
     @Provides
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/routes/RouteBuilder.java 
b/server/src/main/java/org/apache/cassandra/sidecar/routes/RouteBuilder.java
index 4dd8a3e5..cd25cfe6 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/routes/RouteBuilder.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/routes/RouteBuilder.java
@@ -26,6 +26,9 @@ import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import com.github.benmanes.caffeine.cache.AsyncCache;
 import io.vertx.core.Handler;
 import io.vertx.core.http.HttpMethod;
 import io.vertx.ext.auth.authorization.AndAuthorization;
@@ -35,15 +38,18 @@ import 
io.vertx.ext.auth.authorization.AuthorizationProvider;
 import io.vertx.ext.web.Route;
 import io.vertx.ext.web.Router;
 import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.AuthorizationHandler;
 import io.vertx.ext.web.handler.BodyHandler;
 import org.apache.cassandra.sidecar.acl.AdminIdentityResolver;
+import org.apache.cassandra.sidecar.acl.authorization.AuthorizationCacheKey;
 import 
org.apache.cassandra.sidecar.acl.authorization.AuthorizationParameterValidateHandler;
-import 
org.apache.cassandra.sidecar.acl.authorization.AuthorizationWithAdminBypassHandler;
+import 
org.apache.cassandra.sidecar.acl.authorization.CachedAuthorizationHandler;
 import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
 import org.apache.cassandra.sidecar.common.utils.Preconditions;
 import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
 import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
 import org.apache.cassandra.sidecar.handlers.AccessProtected;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 
 import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.KEYSPACE;
 import static org.apache.cassandra.sidecar.common.ApiEndpointsV1.TABLE;
@@ -58,20 +64,20 @@ public class RouteBuilder
     private final AuthorizationProvider authorizationProvider;
     private final AdminIdentityResolver adminIdentityResolver;
     private final AuthorizationParameterValidateHandler 
authZParameterValidateHandler;
-
+    private final SidecarMetrics sidecarMetrics;
+    private final AsyncCache<AuthorizationCacheKey, Boolean> 
authorizationCache;
     private boolean setBodyHandler;
     private boolean accessProtected = true;
     private final List<Handler<RoutingContext>> handlers = new ArrayList<>();
 
-    private RouteBuilder(AccessControlConfiguration accessControlConfiguration,
-                         AuthorizationProvider authorizationProvider,
-                         AdminIdentityResolver adminIdentityResolver,
-                         AuthorizationParameterValidateHandler 
authZParameterValidateHandler)
+    private RouteBuilder(Factory factory)
     {
-        this.accessControlConfiguration = accessControlConfiguration;
-        this.authorizationProvider = authorizationProvider;
-        this.adminIdentityResolver = adminIdentityResolver;
-        this.authZParameterValidateHandler = authZParameterValidateHandler;
+        this.accessControlConfiguration = factory.accessControlConfiguration;
+        this.authorizationProvider = factory.authorizationProvider;
+        this.adminIdentityResolver = factory.adminIdentityResolver;
+        this.authZParameterValidateHandler = 
factory.authZParameterValidateHandler;
+        this.sidecarMetrics = factory.sidecarMetrics;
+        this.authorizationCache = factory.authorizationCache;
     }
 
     /**
@@ -138,9 +144,10 @@ public class RouteBuilder
                 if (accessControlConfiguration.enabled())
                 {
                     // authorization handler added before route specific 
handler chain
-                    AuthorizationWithAdminBypassHandler authorizationHandler
-                    = new 
AuthorizationWithAdminBypassHandler(authZParameterValidateHandler, 
adminIdentityResolver,
-                                                              
requiredAuthorization());
+                    AuthorizationHandler authorizationHandler =
+                    new CachedAuthorizationHandler(accessControlConfiguration, 
authZParameterValidateHandler,
+                                                   adminIdentityResolver, 
requiredAuthorization(), sidecarMetrics,
+                                                   authorizationCache);
                     
authorizationHandler.addAuthorizationProvider(authorizationProvider);
                     
authorizationHandler.variableConsumer(routeGenericVariableConsumer());
 
@@ -152,7 +159,8 @@ public class RouteBuilder
         };
     }
 
-    private Authorization requiredAuthorization()
+    @VisibleForTesting
+    public Authorization requiredAuthorization()
     {
         Set<Authorization> requiredAuthorizations = handlers
                                                     .stream()
@@ -173,7 +181,8 @@ public class RouteBuilder
         return andAuthorization;
     }
 
-    private BiConsumer<RoutingContext, AuthorizationContext> 
routeGenericVariableConsumer()
+    @VisibleForTesting
+    public BiConsumer<RoutingContext, AuthorizationContext> 
routeGenericVariableConsumer()
     {
         return (routingCtx, authZContext) -> {
             Optional<QualifiedTableName> optional = 
RoutingContextUtils.getAsOptional(routingCtx, SC_QUALIFIED_TABLE_NAME);
@@ -206,25 +215,27 @@ public class RouteBuilder
         private final AuthorizationProvider authorizationProvider;
         private final AdminIdentityResolver adminIdentityResolver;
         private final AuthorizationParameterValidateHandler 
authZParameterValidateHandler;
+        private final SidecarMetrics sidecarMetrics;
+        private final AsyncCache<AuthorizationCacheKey, Boolean> 
authorizationCache;
 
         public Factory(AccessControlConfiguration accessControlConfiguration,
                        AuthorizationProvider authorizationProvider,
                        AdminIdentityResolver adminIdentityResolver,
-                       AuthorizationParameterValidateHandler 
authZParameterValidateHandler)
+                       AuthorizationParameterValidateHandler 
authZParameterValidateHandler,
+                       SidecarMetrics sidecarMetrics,
+                       AsyncCache<AuthorizationCacheKey, Boolean> 
authorizationCache)
         {
-
             this.accessControlConfiguration = accessControlConfiguration;
             this.authorizationProvider = authorizationProvider;
             this.adminIdentityResolver = adminIdentityResolver;
             this.authZParameterValidateHandler = authZParameterValidateHandler;
+            this.sidecarMetrics = sidecarMetrics;
+            this.authorizationCache = authorizationCache;
         }
 
         public RouteBuilder builderForRoute()
         {
-            return new RouteBuilder(accessControlConfiguration,
-                                    authorizationProvider,
-                                    adminIdentityResolver,
-                                    authZParameterValidateHandler);
+            return new RouteBuilder(this);
         }
 
         public VertxRoute buildRouteWithHandler(Handler<RoutingContext> 
handler)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java 
b/server/src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java
index 72e8ef98..01515324 100644
--- a/server/src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java
+++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/CacheFactory.java
@@ -22,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.github.benmanes.caffeine.cache.AsyncCache;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.RemovalListener;
@@ -29,8 +30,11 @@ import com.github.benmanes.caffeine.cache.Ticker;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.acl.authorization.AuthorizationCacheKey;
 import org.apache.cassandra.sidecar.config.CacheConfiguration;
-import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
 import org.jetbrains.annotations.VisibleForTesting;
 
 /**
@@ -42,19 +46,25 @@ public class CacheFactory
     private static final Logger LOGGER = 
LoggerFactory.getLogger(CacheFactory.class);
 
     private final Cache<SSTableImporter.ImportOptions, Future<Void>> 
ssTableImportCache;
+    private final AsyncCache<AuthorizationCacheKey, Boolean> 
endpointAuthorizationCache;
 
     @Inject
-    public CacheFactory(ServiceConfiguration configuration, SSTableImporter 
ssTableImporter)
+    public CacheFactory(SidecarConfiguration configuration,
+                        SSTableImporter ssTableImporter,
+                        SidecarMetrics sidecarMetrics)
     {
-        this(configuration, ssTableImporter, Ticker.systemTicker());
+        this(configuration, ssTableImporter, sidecarMetrics, 
Ticker.systemTicker());
     }
 
     @VisibleForTesting
-    CacheFactory(ServiceConfiguration configuration, SSTableImporter 
ssTableImporter, Ticker ticker)
+    CacheFactory(SidecarConfiguration configuration, SSTableImporter 
ssTableImporter, SidecarMetrics sidecarMetrics,
+                 Ticker ticker)
     {
-        this.ssTableImportCache = 
initSSTableImportCache(configuration.sstableImportConfiguration()
+        this.ssTableImportCache = 
initSSTableImportCache(configuration.serviceConfiguration()
+                                                                      
.sstableImportConfiguration()
                                                                       
.cacheConfiguration(),
                                                          ssTableImporter, 
ticker);
+        this.endpointAuthorizationCache = 
initEndpointAuthorizationCache(configuration, sidecarMetrics, ticker);
     }
 
     /**
@@ -65,6 +75,14 @@ public class CacheFactory
         return ssTableImportCache;
     }
 
+    /**
+     * @return the cache used for authorization requests
+     */
+    public AsyncCache<AuthorizationCacheKey, Boolean> 
endpointAuthorizationCache()
+    {
+        return endpointAuthorizationCache;
+    }
+
     /**
      * Initializes the SSTable Import Cache using the provided {@code 
configuration} and {@code ticker}
      * for the cache
@@ -95,4 +113,38 @@ public class CacheFactory
                        )
                        .build();
     }
+
+    /**
+     * Initializes the Authorization Cache using the provided {@code 
configuration}. We want to create only one
+     * instance of authorization cache.
+     *
+     * @param sidecarConfiguration the Sidecar configuration
+     * @param sidecarMetrics       the Sidecar metrics registry
+     * @return instance of {@link AsyncCache} for caching authorization 
requests
+     */
+    private AsyncCache<AuthorizationCacheKey, Boolean> 
initEndpointAuthorizationCache(SidecarConfiguration sidecarConfiguration,
+                                                                               
       SidecarMetrics sidecarMetrics,
+                                                                               
       Ticker ticker)
+    {
+        if (!sidecarConfiguration.accessControlConfiguration().enabled()
+            || 
!sidecarConfiguration.accessControlConfiguration().permissionCacheConfiguration().enabled())
+        {
+            return null;
+        }
+        CacheConfiguration permissionCacheConfig = 
sidecarConfiguration.accessControlConfiguration()
+                                                                       
.permissionCacheConfiguration();
+        if (permissionCacheConfig.expireAfterAccess() == null)
+        {
+            throw new ConfigurationException("Authorization handler cache must 
be configured with expireAfterAccess");
+        }
+        LOGGER.info("Building Authorization Cache with expireAfterAccess={}, 
maxSize={}",
+                    permissionCacheConfig.expireAfterAccess(), 
permissionCacheConfig.maximumSize());
+        return Caffeine.newBuilder()
+                       .ticker(ticker)
+                       
.expireAfterAccess(permissionCacheConfig.expireAfterAccess().quantity(),
+                                          
permissionCacheConfig.expireAfterAccess().unit())
+                       .maximumSize(permissionCacheConfig.maximumSize())
+                       .recordStats(() -> 
sidecarMetrics.server().cache().authorizationCacheMetrics)
+                       .buildAsync();
+    }
 }
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationCacheKeyImplTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationCacheKeyImplTest.java
new file mode 100644
index 00000000..f1faa110
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/acl/authorization/AuthorizationCacheKeyImplTest.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import io.vertx.core.MultiMap;
+import io.vertx.ext.auth.User;
+
+import static 
org.apache.cassandra.sidecar.utils.AuthUtils.CASSANDRA_ROLES_ATTRIBUTE_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test for {@link AuthorizationCacheKeyImpl}
+ */
+class AuthorizationCacheKeyImplTest
+{
+    @Test
+    void testEqualsSameUserSameVariables()
+    {
+        User user1 = createUser("user1", List.of("role1", "role2"));
+        User user2 = createUser("user1", List.of("role1", "role2"));
+
+        MultiMap variables1 = 
MultiMap.caseInsensitiveMultiMap().add("keyspace", "ks1");
+        MultiMap variables2 = 
MultiMap.caseInsensitiveMultiMap().add("keyspace", "ks1");
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables1);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, variables2);
+
+        assertThat(key1).isEqualTo(key2);
+        assertThat(key1.hashCode()).isEqualTo(key2.hashCode());
+    }
+
+    @Test
+    void testEqualsSameUserNullVariables()
+    {
+        User user1 = createUser("user1", List.of("role1", "role2"));
+        User user2 = createUser("user1", List.of("role1", "role2"));
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, null);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, null);
+
+        assertThat(key1).isEqualTo(key2);
+        assertThat(key1.hashCode()).isEqualTo(key2.hashCode());
+    }
+
+    @Test
+    void testEqualsEmptyVariables()
+    {
+        User user1 = createUser("user1", List.of("role1"));
+        User user2 = createUser("user1", List.of("role1"));
+
+        MultiMap variables1 = MultiMap.caseInsensitiveMultiMap();
+        MultiMap variables2 = MultiMap.caseInsensitiveMultiMap();
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables1);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, variables2);
+
+        assertThat(key1).isEqualTo(key2);
+        assertThat(key1.hashCode()).isEqualTo(key2.hashCode());
+    }
+
+    @Test
+    void testNotEqualsDifferentRoles()
+    {
+        User user1 = createUser("user1", List.of("role1", "role2"));
+        User user2 = createUser("user1", List.of("role1", "role3"));
+
+        MultiMap variables = 
MultiMap.caseInsensitiveMultiMap().add("keyspace", "ks1");
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, variables);
+
+        assertThat(key1).isNotEqualTo(key2);
+        assertThat(key1.hashCode()).isNotEqualTo(key2.hashCode());
+    }
+
+    @Test
+    void testNotEqualsDifferentVariables()
+    {
+        User user1 = createUser("user1", List.of("role1", "role2"));
+        User user2 = createUser("user1", List.of("role1", "role2"));
+
+        MultiMap variables1 = 
MultiMap.caseInsensitiveMultiMap().add("keyspace", "ks1");
+        MultiMap variables2 = 
MultiMap.caseInsensitiveMultiMap().add("keyspace", "ks2");
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables1);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, variables2);
+
+        assertThat(key1).isNotEqualTo(key2);
+        assertThat(key1.hashCode()).isNotEqualTo(key2.hashCode());
+    }
+
+    @Test
+    void testNotEqualsOneWithVariablesOneWithout()
+    {
+        User user1 = createUser("user1", List.of("role1", "role2"));
+        User user2 = createUser("user1", List.of("role1", "role2"));
+
+        MultiMap variables1 = 
MultiMap.caseInsensitiveMultiMap().add("keyspace", "ks1");
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables1);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, null);
+
+        assertThat(key1).isNotEqualTo(key2);
+        assertThat(key1.hashCode()).isNotEqualTo(key2.hashCode());
+    }
+
+    @Test
+    void testVariablesMutationDoesNotAffectKey()
+    {
+        User user1 = createUser("user1", List.of("role1"));
+        User user2 = createUser("user1", List.of("role1"));
+
+        MultiMap variables1 = 
MultiMap.caseInsensitiveMultiMap().add("keyspace", "ks1");
+        MultiMap variables2 = 
MultiMap.caseInsensitiveMultiMap().add("keyspace", "ks1");
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables1);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, variables2);
+
+        // Mutate original MultiMap
+        variables1.add("table", "tb1");
+
+        // Keys should still be equal because AuthorizationCacheKeyImpl 
creates a copy
+        assertThat(key1).isEqualTo(key2);
+    }
+
+    @Test
+    void testEqualsMultipleVariablesSameOrder()
+    {
+        User user1 = createUser("user1", List.of("role1"));
+        User user2 = createUser("user1", List.of("role1"));
+
+        MultiMap variables1 = MultiMap.caseInsensitiveMultiMap()
+                                      .add("keyspace", "ks1")
+                                      .add("table", "tb1");
+        MultiMap variables2 = MultiMap.caseInsensitiveMultiMap()
+                                      .add("keyspace", "ks1")
+                                      .add("table", "tb1");
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables1);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, variables2);
+
+        assertThat(key1).isEqualTo(key2);
+        assertThat(key1.hashCode()).isEqualTo(key2.hashCode());
+    }
+
+    @Test
+    void testEqualsMultipleVariablesDifferentOrder()
+    {
+        User user1 = createUser("user1", List.of("role1"));
+        User user2 = createUser("user1", List.of("role1"));
+
+        MultiMap variables1 = MultiMap.caseInsensitiveMultiMap()
+                                      .add("keyspace", "ks1")
+                                      .add("table", "tb1");
+        MultiMap variables2 = MultiMap.caseInsensitiveMultiMap()
+                                      .add("table", "tb1")
+                                      .add("keyspace", "ks1");
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables1);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, variables2);
+
+        // Should be equal regardless of insertion order
+        assertThat(key1).isEqualTo(key2);
+        assertThat(key1.hashCode()).isEqualTo(key2.hashCode());
+    }
+
+    @Test
+    void testEqualsVariablesWithMultipleValues()
+    {
+        User user1 = createUser("user1", List.of("role1"));
+        User user2 = createUser("user1", List.of("role1"));
+
+        MultiMap variables1 = MultiMap.caseInsensitiveMultiMap()
+                                      .add("keyspace", "ks1")
+                                      .add("keyspace", "ks1");
+        MultiMap variables2 = MultiMap.caseInsensitiveMultiMap()
+                                      .add("keyspace", "ks1")
+                                      .add("keyspace", "ks1");
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables1);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, variables2);
+
+        assertThat(key1).isEqualTo(key2);
+        assertThat(key1.hashCode()).isEqualTo(key2.hashCode());
+    }
+
+    @Test
+    void testNotEqualsVariablesWithDifferentMultipleValues()
+    {
+        User user1 = createUser("user1", List.of("role1"));
+        User user2 = createUser("user1", List.of("role1"));
+
+        MultiMap variables1 = MultiMap.caseInsensitiveMultiMap()
+                                      .add("keyspace", "ks1")
+                                      .add("keyspace", "ks2");
+        MultiMap variables2 = MultiMap.caseInsensitiveMultiMap()
+                                      .add("keyspace", "ks1")
+                                      .add("keyspace", "ks3");
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables1);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, variables2);
+
+        assertThat(key1).isNotEqualTo(key2);
+        assertThat(key1.hashCode()).isNotEqualTo(key2.hashCode());
+    }
+
+    @Test
+    void testEqualsCaseInsensitiveVariables()
+    {
+        User user1 = createUser("user1", List.of("role1"));
+        User user2 = createUser("user1", List.of("role1"));
+
+        MultiMap variables1 = 
MultiMap.caseInsensitiveMultiMap().add("Keyspace", "ks1");
+        MultiMap variables2 = 
MultiMap.caseInsensitiveMultiMap().add("keyspace", "ks1");
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables1);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, variables2);
+
+        assertThat(key1).isEqualTo(key2);
+    }
+
+    @Test
+    void testEqualsEmptyRolesList()
+    {
+        User user1 = createUser("user1", Collections.emptyList());
+        User user2 = createUser("user1", Collections.emptyList());
+
+        MultiMap variables = 
MultiMap.caseInsensitiveMultiMap().add("keyspace", "ks1");
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(1, 
user2, variables);
+
+        assertThat(key1).isEqualTo(key2);
+        assertThat(key1.hashCode()).isEqualTo(key2.hashCode());
+    }
+
+    @Test
+    void testNotEqualsDifferentHandlerId()
+    {
+        User user1 = createUser("user1", List.of("role1"));
+        User user2 = createUser("user1", List.of("role1"));
+
+        MultiMap variables1 = 
MultiMap.caseInsensitiveMultiMap().add("keyspace", "ks1");
+        MultiMap variables2 = 
MultiMap.caseInsensitiveMultiMap().add("keyspace", "ks1");
+
+        AuthorizationCacheKeyImpl key1 = new AuthorizationCacheKeyImpl(1, 
user1, variables1);
+        AuthorizationCacheKeyImpl key2 = new AuthorizationCacheKeyImpl(2, 
user2, variables2);
+
+        assertThat(key1).isNotEqualTo(key2);
+        assertThat(key1.hashCode()).isNotEqualTo(key2.hashCode());
+    }
+
+    private User createUser(String username, List<String> roles)
+    {
+        User user = User.fromName(username);
+        user.attributes().put(CASSANDRA_ROLES_ATTRIBUTE_NAME, roles);
+        return user;
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandlerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandlerTest.java
new file mode 100644
index 00000000..6fe4b3ea
--- /dev/null
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/acl/authorization/CachedAuthorizationHandlerTest.java
@@ -0,0 +1,610 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.acl.authorization;
+
+import java.util.List;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.github.benmanes.caffeine.cache.stats.CacheStats;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.ext.auth.User;
+import io.vertx.ext.auth.authorization.AndAuthorization;
+import io.vertx.ext.auth.authorization.Authorization;
+import io.vertx.ext.auth.authorization.AuthorizationProvider;
+import io.vertx.ext.auth.authorization.PermissionBasedAuthorization;
+import io.vertx.ext.web.RoutingContext;
+import org.apache.cassandra.sidecar.acl.AdminIdentityResolver;
+import org.apache.cassandra.sidecar.common.server.data.QualifiedTableName;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
+import org.apache.cassandra.sidecar.config.CacheConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
+import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
+import org.apache.cassandra.sidecar.routes.RouteBuilder;
+import org.apache.cassandra.sidecar.routes.RoutingContextUtils;
+import org.apache.cassandra.sidecar.utils.CacheFactory;
+import org.apache.cassandra.sidecar.utils.SSTableImporter;
+
+import static 
org.apache.cassandra.sidecar.utils.AuthUtils.CASSANDRA_ROLES_ATTRIBUTE_NAME;
+import static org.apache.cassandra.sidecar.utils.TestMetricUtils.registry;
+import static org.apache.cassandra.testing.utils.AssertionUtils.loopAssert;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link CachedAuthorizationHandler}
+ */
+class CachedAuthorizationHandlerTest
+{
+    private SidecarConfiguration sidecarConfiguration;
+    private AccessControlConfiguration mockAccessControlConfig;
+    private AuthorizationParameterValidateHandler mockValidateHandler;
+    private AdminIdentityResolver mockAdminIdentityResolver;
+    private CacheConfiguration mockCacheConfig;
+    private SidecarMetrics metrics;
+    private Authorization testAuthorization;
+    private RouteBuilder.Factory routeBuilderFactory;
+    private SSTableImporter sstableImporter;
+
+    @BeforeEach
+    void setUp()
+    {
+        mockAccessControlConfig = mock(AccessControlConfiguration.class);
+        mockValidateHandler = 
mock(AuthorizationParameterValidateHandler.class);
+        mockAdminIdentityResolver = mock(AdminIdentityResolver.class);
+        mockCacheConfig = mock(CacheConfiguration.class);
+
+        MetricRegistryFactory registryFactory
+        = new MetricRegistryFactory("cassandra_sidecar", List.of(), List.of());
+        metrics = new SidecarMetricsImpl(registryFactory, null);
+
+        // Default cache configuration
+        
when(mockCacheConfig.expireAfterAccess()).thenReturn(MillisecondBoundConfiguration.parse("3s"));
+        when(mockCacheConfig.maximumSize()).thenReturn(1000L);
+        when(mockCacheConfig.enabled()).thenReturn(true);
+        when(mockAccessControlConfig.enabled()).thenReturn(true);
+        
when(mockAccessControlConfig.permissionCacheConfiguration()).thenReturn(mockCacheConfig);
+
+        testAuthorization = AndAuthorization.create()
+                                            
.addAuthorization(PermissionBasedAuthorization.create("MODIFY"));
+
+        AuthorizationProvider mockAuthorizationProvider = 
mock(AuthorizationProvider.class);
+
+        ServiceConfiguration serviceConfiguration = new 
ServiceConfigurationImpl();
+        sidecarConfiguration = mock(SidecarConfiguration.class);
+        
when(sidecarConfiguration.serviceConfiguration()).thenReturn(serviceConfiguration);
+        
when(sidecarConfiguration.accessControlConfiguration()).thenReturn(mockAccessControlConfig);
+        sstableImporter = mock(SSTableImporter.class);
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+
+        routeBuilderFactory = new 
RouteBuilder.Factory(mockAccessControlConfig, mockAuthorizationProvider,
+                                                       
mockAdminIdentityResolver, mockValidateHandler, metrics,
+                                                       
cacheFactory.endpointAuthorizationCache());
+
+        // Take baseline before first call. A snapshot call refreshes cache 
miss and cache hits. But it does not reset
+        // load success count or load failure count
+        CacheStats baseline = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+
+    }
+
+    @AfterEach
+    void tearDown()
+    {
+        registry().removeMatching((name, metric) -> true);
+    }
+
+    @Test
+    void testAdminBypassesAuthorization()
+    {
+        
when(mockCacheConfig.expireAfterAccess()).thenReturn(MillisecondBoundConfiguration.parse("5m"));
+        
when(mockAdminIdentityResolver.isAdmin("admin-identity1")).thenReturn(true);
+
+        Authorization expected = PermissionBasedAuthorization.create("CREATE");
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+        CachedAuthorizationHandler handler
+        = new CachedAuthorizationHandler(1, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         expected, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        RoutingContext mockContext = createMockContext("admin-user", 
"admin-identity1", "admin-role1");
+        verifySuccess(handler, mockContext);
+    }
+
+    @Test
+    void testMultipleIdentitiesOneIsAdmin()
+    {
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+        CachedAuthorizationHandler handler
+        = new CachedAuthorizationHandler(2, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         testAuthorization, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        RoutingContext mockContext
+        = createMockContext("user1", List.of("identity1", "admin-identity2"), 
List.of("admin-role2"));
+        when(mockAdminIdentityResolver.isAdmin("identity1")).thenReturn(false);
+        
when(mockAdminIdentityResolver.isAdmin("admin-identity2")).thenReturn(true);
+
+        verifySuccess(handler, mockContext);
+    }
+
+    @Test
+    void testNonAdminRequiresAuthorization()
+    {
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+        CachedAuthorizationHandler handler
+        = new CachedAuthorizationHandler(3, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         testAuthorization, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        RoutingContext mockContext = createMockContext("user2", "identity2", 
"role2");
+        when(mockAdminIdentityResolver.isAdmin("identity2")).thenReturn(false);
+
+        verifySuccess(handler, mockContext);
+    }
+
+    @Test
+    void testNonAdminDifferentPermissionForbidden()
+    {
+        Authorization expected = PermissionBasedAuthorization.create("CREATE");
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+        CachedAuthorizationHandler handler
+        = new CachedAuthorizationHandler(4, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         expected, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        RoutingContext mockContext = createMockContext("user3", "identity3", 
"role3");
+        when(mockAdminIdentityResolver.isAdmin("identity3")).thenReturn(false);
+
+        verifyFailure(handler, mockContext);
+    }
+
+    @Test
+    void testCacheHitSameUser()
+    {
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+        CachedAuthorizationHandler handler
+        = new CachedAuthorizationHandler(5, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         testAuthorization, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        RoutingContext mockContext1 = createMockContext("user4", "identity4", 
"role4");
+        RoutingContext mockContext2 = createMockContext("user4", "identity4", 
"role4");
+        when(mockAdminIdentityResolver.isAdmin("identity4")).thenReturn(false);
+
+        verifySuccess(handler, mockContext1);
+
+        for (int i = 0; i < 5; i++)
+        {
+            handler.handle(mockContext2);
+        }
+
+        verify(mockContext2, times(5)).next();
+
+        // Verify cache hit for mockContext2
+        CacheStats multipleCallStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(multipleCallStats.missCount()).isEqualTo(0);
+        assertThat(multipleCallStats.hitCount()).isEqualTo(5);
+    }
+
+    @Test
+    void testCacheMissDifferentUsers()
+    {
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+        CachedAuthorizationHandler handler
+        = new CachedAuthorizationHandler(6, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         testAuthorization, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        RoutingContext mockContext1 = createMockContext("user5", "identity5", 
"role5");
+        RoutingContext mockContext2 = createMockContext("user5", "identity6", 
"role6");
+        when(mockAdminIdentityResolver.isAdmin(any())).thenReturn(false);
+
+        verifySuccess(handler, mockContext1);
+
+        handler.handle(mockContext2);
+
+        loopAssert(2, 100, () -> verify(mockContext2).next());
+
+        // Verify cache miss for different user
+        CacheStats differentUserStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(differentUserStats.missCount()).isEqualTo(1);
+        assertThat(differentUserStats.hitCount()).isEqualTo(0);
+    }
+
+    @Test
+    void testCacheHitSameUserSameResource()
+    {
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+        CachedAuthorizationHandler handler
+        = new CachedAuthorizationHandler(7, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         testAuthorization, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        
handler.variableConsumer(routeBuilderFactory.builderForRoute().routeGenericVariableConsumer());
+
+        QualifiedTableName table = new QualifiedTableName("ks", "tbl");
+        RoutingContext mockContext1 = createMockContext("user6", "identity7", 
"role7");
+        when(mockContext1.get("SC_QUALIFIED_TABLE_NAME")).thenReturn(table);
+
+        RoutingContext mockContext2 = createMockContext("user6", "identity7", 
"role7");
+        RoutingContextUtils.put(mockContext2, 
RoutingContextUtils.SC_QUALIFIED_TABLE_NAME, table);
+        when(mockContext2.get("SC_QUALIFIED_TABLE_NAME")).thenReturn(table);
+
+        verifySuccess(handler, mockContext1);
+
+        handler.handle(mockContext2);
+
+        CacheStats sameUserSameResourceCallStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(sameUserSameResourceCallStats.missCount()).isEqualTo(0);
+        assertThat(sameUserSameResourceCallStats.hitCount()).isEqualTo(1);
+    }
+
+    @Test
+    void testCacheMissSameUserDifferentResources()
+    {
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+        CachedAuthorizationHandler handler
+        = new CachedAuthorizationHandler(8, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         testAuthorization, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        
handler.variableConsumer(routeBuilderFactory.builderForRoute().routeGenericVariableConsumer());
+
+        RoutingContext mockContext1 = createMockContext("user7", "identity8", 
"role8");
+        QualifiedTableName table1 = new QualifiedTableName("ks1", "tbl1");
+        when(mockContext1.get("SC_QUALIFIED_TABLE_NAME")).thenReturn(table1);
+
+        RoutingContext mockContext2 = createMockContext("user7", "identity8", 
"role8");
+        QualifiedTableName table2 = new QualifiedTableName("ks2", "tbl2");
+        when(mockContext2.get("SC_QUALIFIED_TABLE_NAME")).thenReturn(table2);
+
+        when(mockAdminIdentityResolver.isAdmin(any())).thenReturn(false);
+
+        verifySuccess(handler, mockContext1);
+
+        CacheStats baselineStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        handler.handle(mockContext2);
+
+        CacheStats sameUserDifferentResourceCallStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(sameUserDifferentResourceCallStats.missCount() - 
baselineStats.missCount()).isEqualTo(1);
+        assertThat(sameUserDifferentResourceCallStats.hitCount()).isEqualTo(0);
+    }
+
+    @Test
+    void testValidationFailure()
+    {
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+        CachedAuthorizationHandler handler
+        = new CachedAuthorizationHandler(9, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         testAuthorization, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        RoutingContext mockContext = createMockContext("user8", "identity9", 
"role9");
+
+        // Mock validation handler to fail the context
+        doAnswer(invocation -> {
+            RoutingContext ctx = invocation.getArgument(0);
+            when(ctx.failed()).thenReturn(true);
+            return null;
+        }).when(mockValidateHandler).handle(any(RoutingContext.class));
+
+        handler.handle(mockContext);
+
+        // Should not proceed to authorization or call next()
+        verify(mockContext, times(0)).next();
+
+        // Verify no cache operations when validation fails
+        CacheStats firstCallStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(firstCallStats.missCount()).isEqualTo(0);
+        assertThat(firstCallStats.hitCount()).isEqualTo(0);
+    }
+
+    @Test
+    void testEmptyIdentities()
+    {
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+        CachedAuthorizationHandler handlerWithModifyPermission
+        = new CachedAuthorizationHandler(10, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         testAuthorization, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        RoutingContext mockContext1 = createMockContext("user9", List.of(), 
List.of());
+        when(mockAdminIdentityResolver.isAdmin(any())).thenReturn(false);
+
+        verifySuccess(handlerWithModifyPermission, mockContext1);
+
+        Authorization expected = PermissionBasedAuthorization.create("CREATE");
+        CachedAuthorizationHandler handlerWithCreatePermission
+        = new CachedAuthorizationHandler(11, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         expected, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        RoutingContext mockContext2 = createMockContext("user10", List.of(), 
List.of());
+        handlerWithCreatePermission.handle(mockContext2);
+        CacheStats emptyIdentityWithoutPermissionCall = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        
assertThat(emptyIdentityWithoutPermissionCall.missCount()).isEqualTo(1);
+        assertThat(emptyIdentityWithoutPermissionCall.hitCount()).isEqualTo(0);
+    }
+
+    @Test
+    void testAuthorizationComputedEveryTimeWhenCacheDisabled()
+    {
+        // Disable cache
+        when(mockCacheConfig.enabled()).thenReturn(false);
+
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+        CachedAuthorizationHandler handler
+        = new CachedAuthorizationHandler(12, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         testAuthorization, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        RoutingContext mockContext1 = createMockContext("user11", 
"identity11", "role11");
+        RoutingContext mockContext2 = createMockContext("user11", 
"identity11", "role11");
+        RoutingContext mockContext3 = createMockContext("user11", 
"identity11", "role11");
+
+        
when(mockAdminIdentityResolver.isAdmin("identity11")).thenReturn(false);
+
+        // First call
+        handler.handle(mockContext1);
+        loopAssert(2, 100, () -> verify(mockContext1).next());
+
+        // Subsequent calls with same user
+        handler.handle(mockContext2);
+        loopAssert(2, 100, () -> verify(mockContext2).next());
+
+        handler.handle(mockContext3);
+        loopAssert(2, 100, () -> verify(mockContext3).next());
+
+        // Verify cache was never used (no hits or misses recorded)
+        CacheStats cacheStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(cacheStats.hitCount()).isEqualTo(0);
+        assertThat(cacheStats.missCount()).isEqualTo(0);
+
+        // All three calls should have resulted in ctx.next() being called
+        verify(mockContext1, times(1)).next();
+        verify(mockContext2, times(1)).next();
+        verify(mockContext3, times(1)).next();
+    }
+
+    @Test
+    void testDifferentHandlerIdsPreventsSharedCacheEntries()
+    {
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+
+        // Handler 1 requires MODIFY permission
+        CachedAuthorizationHandler handler1
+        = new CachedAuthorizationHandler(100, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         testAuthorization, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        // Handler 2 requires CREATE permission
+        Authorization createAuthorization = AndAuthorization.create()
+                                                            
.addAuthorization(PermissionBasedAuthorization.create("CREATE"));
+        CachedAuthorizationHandler handler2
+        = new CachedAuthorizationHandler(200, mockAccessControlConfig, 
mockValidateHandler, mockAdminIdentityResolver,
+                                         createAuthorization, metrics, 
cacheFactory.endpointAuthorizationCache());
+
+        // Same user accessing both routes
+        RoutingContext mockContext1 = createMockContext("user12", 
"identity12", "role12");
+        RoutingContext mockContext2 = createMockContext("user12", 
"identity12", "role12");
+
+        
when(mockAdminIdentityResolver.isAdmin("identity12")).thenReturn(false);
+
+        // First handler succeeds (user has MODIFY permission)
+        handler1.handle(mockContext1);
+        loopAssert(2, 100, () -> verify(mockContext1).next());
+
+        CacheStats statsAfterHandler1 = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(statsAfterHandler1.missCount()).isEqualTo(1L);
+        assertThat(statsAfterHandler1.hitCount()).isEqualTo(0L);
+
+        // Second handler processes same user but with different handlerId
+        // Should NOT reuse cache from handler1 (different handlerId means 
different cache key)
+        handler2.handle(mockContext2);
+
+        CacheStats statsAfterHandler2 = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(statsAfterHandler2.missCount()).isEqualTo(1L);
+        assertThat(statsAfterHandler2.hitCount()).isEqualTo(0L);
+
+        // Now test that same handler can reuse its own cache
+        RoutingContext mockContext3 = createMockContext("user12", 
"identity12", "role12");
+        handler1.handle(mockContext3);
+
+        CacheStats statsAfterHandler1Reuse = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(statsAfterHandler1Reuse.hitCount()).isEqualTo(1L);
+    }
+
+    @Test
+    void testSameHandlerIdSamePermissionSharesCacheEntries()
+    {
+        int sharedHandlerId = 300;
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+
+        // Create two handler instances with the SAME handlerId
+        CachedAuthorizationHandler handler1
+        = new CachedAuthorizationHandler(sharedHandlerId, 
mockAccessControlConfig, mockValidateHandler,
+                                         mockAdminIdentityResolver, 
testAuthorization, metrics,
+                                         
cacheFactory.endpointAuthorizationCache());
+
+        CachedAuthorizationHandler handler2
+        = new CachedAuthorizationHandler(sharedHandlerId, 
mockAccessControlConfig, mockValidateHandler,
+                                         mockAdminIdentityResolver, 
testAuthorization, metrics,
+                                         
cacheFactory.endpointAuthorizationCache());
+
+        RoutingContext mockContext1 = createMockContext("user13", 
"identity13", "role13");
+        RoutingContext mockContext2 = createMockContext("user13", 
"identity13", "role13");
+
+        
when(mockAdminIdentityResolver.isAdmin("identity13")).thenReturn(false);
+
+        // First handler processes request
+        handler1.handle(mockContext1);
+        loopAssert(2, 100, () -> verify(mockContext1).next());
+
+        CacheStats statsAfterHandler1 = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(statsAfterHandler1.missCount()).isEqualTo(1L);
+        assertThat(statsAfterHandler1.hitCount()).isEqualTo(0L);
+
+        // Second handler with SAME handlerId should reuse the cache entry
+        handler2.handle(mockContext2);
+        loopAssert(2, 100, () -> verify(mockContext2).next());
+
+        CacheStats statsAfterHandler2 = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(statsAfterHandler2.hitCount()).isEqualTo(1L);
+        assertThat(statsAfterHandler2.missCount()).isEqualTo(0L);
+    }
+
+    @Test
+    void testSameHandlerIdDifferentPermissionsStillSharesCacheEntries()
+    {
+        int sharedHandlerId = 300;
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+
+        // Create two handler instances with the SAME handlerId
+        CachedAuthorizationHandler handler1
+        = new CachedAuthorizationHandler(sharedHandlerId, 
mockAccessControlConfig, mockValidateHandler,
+                                         mockAdminIdentityResolver, 
testAuthorization, metrics,
+                                         
cacheFactory.endpointAuthorizationCache());
+
+        // Handler 2 requires CREATE permission
+        Authorization createAuthorization = AndAuthorization.create()
+                                                            
.addAuthorization(PermissionBasedAuthorization.create("CREATE"));
+
+        CachedAuthorizationHandler handler2
+        = new CachedAuthorizationHandler(sharedHandlerId, 
mockAccessControlConfig, mockValidateHandler,
+                                         mockAdminIdentityResolver, 
createAuthorization, metrics,
+                                         
cacheFactory.endpointAuthorizationCache());
+
+        RoutingContext mockContext1 = createMockContext("user13", 
"identity13", "role13");
+        RoutingContext mockContext2 = createMockContext("user13", 
"identity13", "role13");
+
+        
when(mockAdminIdentityResolver.isAdmin("identity13")).thenReturn(false);
+
+        // First handler processes request
+        handler1.handle(mockContext1);
+        loopAssert(2, 100, () -> verify(mockContext1).next());
+
+        CacheStats statsAfterHandler1 = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(statsAfterHandler1.missCount()).isEqualTo(1L);
+        assertThat(statsAfterHandler1.hitCount()).isEqualTo(0L);
+
+        // Second handler with SAME handlerId should reuse the cache entry
+        handler2.handle(mockContext2);
+        loopAssert(2, 100, () -> verify(mockContext2).next());
+
+        CacheStats statsAfterHandler2 = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(statsAfterHandler2.hitCount()).isEqualTo(1L);
+        assertThat(statsAfterHandler2.missCount()).isEqualTo(0L);
+    }
+
+    private RoutingContext createMockContext(String username, String identity, 
String role)
+    {
+        return createMockContext(username, List.of(identity), List.of(role));
+    }
+
+    private RoutingContext createMockContext(String username, List<String> 
identities, List<String> roles)
+    {
+        RoutingContext mockContext = mock(RoutingContext.class);
+        HttpServerRequest mockServerRequest = mock(HttpServerRequest.class);
+        when(mockServerRequest.isEnded()).thenReturn(false);
+        when(mockServerRequest.pause()).thenReturn(mockServerRequest);
+        when(mockServerRequest.resume()).thenReturn(mockServerRequest);
+        when(mockContext.request()).thenReturn(mockServerRequest);
+        User mockUser = createMockUser(username, identities, roles);
+        when(mockContext.user()).thenReturn(mockUser);
+
+        // Track failed state - starts as false, becomes true when fail() is 
called
+        when(mockContext.failed()).thenReturn(false);
+        doAnswer(invocation -> {
+            when(mockContext.failed()).thenReturn(true);
+            return mockContext;
+        }).when(mockContext).fail(any(Integer.class), any(Throwable.class));
+
+        return mockContext;
+    }
+
+    private User createMockUser(String username, List<String> identities, 
List<String> roles)
+    {
+        User mockUser = User.fromName(username);
+        mockUser.principal().put("identities", String.join(",", identities));
+        mockUser.attributes().put(CASSANDRA_ROLES_ATTRIBUTE_NAME, roles);
+        mockUser.authorizations().add("test-provider", 
PermissionBasedAuthorization.create("MODIFY"));
+        return mockUser;
+    }
+
+    private void verifyRequest(CachedAuthorizationHandler handler, 
RoutingContext mockContext)
+    {
+        verifyRequest(handler, mockContext, true, 0);
+    }
+
+    private void verifySuccess(CachedAuthorizationHandler handler, 
RoutingContext mockContext)
+    {
+        verifyRequest(handler, mockContext);
+    }
+
+    private void verifyFailure(CachedAuthorizationHandler handler, 
RoutingContext mockContext)
+    {
+        verifyRequest(handler, mockContext, false, 
HttpResponseStatus.FORBIDDEN.code());
+    }
+
+    private void verifyRequest(CachedAuthorizationHandler handler, 
RoutingContext mockContext,
+                               boolean success, int statusCode)
+    {
+        // Take baseline before first call. A snapshot call refreshes cache 
miss and cache hits. But it does not reset
+        // load success count or load failure count
+        CacheStats baseline = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+
+        handler.handle(mockContext);
+
+        if (success)
+        {
+            loopAssert(2, 100, () -> verify(mockContext).next());
+        }
+        else
+        {
+            loopAssert(2, 100, () -> verify(mockContext, 
times(1)).fail(eq(statusCode), any(Throwable.class)));
+        }
+
+        // Verify cache miss on first admin request
+        CacheStats firstCallStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(firstCallStats.missCount()).isEqualTo(1);
+        assertThat(firstCallStats.hitCount()).isEqualTo(0);
+
+        for (int i = 0; i < 5; i++)
+        {
+            // Reset failed state before each subsequent call to allow handler 
to process
+            when(mockContext.failed()).thenReturn(false);
+            handler.handle(mockContext);
+        }
+
+        if (success)
+        {
+            loopAssert(2, 100, () -> verify(mockContext, times(6)).next());
+        }
+        else
+        {
+            loopAssert(2, 100, () -> verify(mockContext, 
times(6)).fail(eq(statusCode), any(Throwable.class)));
+        }
+
+        // Verify cache hit on subsequent requests
+        CacheStats multipleCallStats = 
metrics.server().cache().authorizationCacheMetrics.snapshot();
+        assertThat(multipleCallStats.missCount()).isEqualTo(0);
+        assertThat(multipleCallStats.hitCount()).isEqualTo(5);
+    }
+}
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/RouteBuilderTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/RouteBuilderTest.java
index f8aeb3a0..43acc5a2 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/handlers/RouteBuilderTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/handlers/RouteBuilderTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.sidecar.handlers;
 
+import java.util.List;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -31,14 +32,24 @@ import io.vertx.ext.web.RoutingContext;
 import org.apache.cassandra.sidecar.acl.AdminIdentityResolver;
 import 
org.apache.cassandra.sidecar.acl.authorization.AuthorizationParameterValidateHandler;
 import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
+import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
+import org.apache.cassandra.sidecar.config.CacheConfiguration;
+import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
 import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
+import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
 import org.apache.cassandra.sidecar.routes.RouteBuilder;
 import org.apache.cassandra.sidecar.routes.RouteBuilder.Factory;
 import org.apache.cassandra.sidecar.routes.SettableVertxRoute;
+import org.apache.cassandra.sidecar.utils.CacheFactory;
 import org.apache.cassandra.sidecar.utils.CassandraInputValidator;
 import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
+import org.apache.cassandra.sidecar.utils.SSTableImporter;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
@@ -49,15 +60,30 @@ import static org.mockito.Mockito.when;
  */
 class RouteBuilderTest
 {
+    MetricRegistryFactory metricRegistryFactory = new 
MetricRegistryFactory("cassandra_sidecar", List.of(), List.of());
+    SidecarMetrics metrics = new SidecarMetricsImpl(metricRegistryFactory, 
null);
+
     @Test
     void testRequiredParameters()
     {
+        CacheConfiguration permissionCacheConfiguration = 
mock(CacheConfiguration.class);
+        when(permissionCacheConfiguration.maximumSize()).thenReturn(5L);
+        
when(permissionCacheConfiguration.expireAfterAccess()).thenReturn(MillisecondBoundConfiguration.parse("5m"));
         AccessControlConfiguration mockConfig = 
mock(AccessControlConfiguration.class);
+        
when(mockConfig.permissionCacheConfiguration()).thenReturn(permissionCacheConfiguration);
         AuthorizationProvider mockAuthorizationProvider = 
mock(AuthorizationProvider.class);
         AdminIdentityResolver mockAdminIdentityResolver = 
mock(AdminIdentityResolver.class);
         AuthorizationParameterValidateHandler mockHandler = 
mock(AuthorizationParameterValidateHandler.class);
 
-        Factory factory = new Factory(mockConfig, mockAuthorizationProvider, 
mockAdminIdentityResolver, mockHandler);
+        ServiceConfiguration serviceConfiguration = new 
ServiceConfigurationImpl();
+        SidecarConfiguration sidecarConfiguration = 
mock(SidecarConfiguration.class);
+        
when(sidecarConfiguration.serviceConfiguration()).thenReturn(serviceConfiguration);
+        
when(sidecarConfiguration.accessControlConfiguration()).thenReturn(mockConfig);
+        SSTableImporter sstableImporter = mock(SSTableImporter.class);
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+
+        Factory factory = new Factory(mockConfig, mockAuthorizationProvider, 
mockAdminIdentityResolver,
+                                      mockHandler, metrics, 
cacheFactory.endpointAuthorizationCache());
         RouteBuilder routeBuilder = factory.builderForRoute();
         Router mockRouter = mock(Router.class);
         SettableVertxRoute route = routeBuilder.build();
@@ -107,12 +133,25 @@ class RouteBuilderTest
                            Function<Factory, RouteBuilder> 
routeBuilderFunction,
                            Consumer<SettableVertxRoute> test)
     {
+        CacheConfiguration permissionCacheConfiguration = 
mock(CacheConfiguration.class);
+        when(permissionCacheConfiguration.maximumSize()).thenReturn(5L);
+        
when(permissionCacheConfiguration.expireAfterAccess()).thenReturn(MillisecondBoundConfiguration.parse("5m"));
         AccessControlConfiguration mockConfig = 
mock(AccessControlConfiguration.class);
+        
when(mockConfig.permissionCacheConfiguration()).thenReturn(permissionCacheConfiguration);
         when(mockConfig.enabled()).thenReturn(true);
         AuthorizationProvider mockAuthorizationProvider = 
mock(AuthorizationProvider.class);
         AdminIdentityResolver mockAdminIdentityResolver = 
mock(AdminIdentityResolver.class);
         AuthorizationParameterValidateHandler mockHandler = 
mock(AuthorizationParameterValidateHandler.class);
-        Factory factory = new Factory(mockConfig, mockAuthorizationProvider, 
mockAdminIdentityResolver, mockHandler);
+
+        ServiceConfiguration serviceConfiguration = new 
ServiceConfigurationImpl();
+        SidecarConfiguration sidecarConfiguration = 
mock(SidecarConfiguration.class);
+        
when(sidecarConfiguration.serviceConfiguration()).thenReturn(serviceConfiguration);
+        
when(sidecarConfiguration.accessControlConfiguration()).thenReturn(mockConfig);
+        SSTableImporter sstableImporter = mock(SSTableImporter.class);
+        CacheFactory cacheFactory = new CacheFactory(sidecarConfiguration, 
sstableImporter, metrics);
+
+        Factory factory = new Factory(mockConfig, mockAuthorizationProvider, 
mockAdminIdentityResolver,
+                                      mockHandler, metrics, 
cacheFactory.endpointAuthorizationCache());
         RouteBuilder routeBuilder = routeBuilderFunction.apply(factory);
         SettableVertxRoute route = routeBuilder.handler(handler).build();
         test.accept(route);
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/utils/CacheFactoryTest.java 
b/server/src/test/java/org/apache/cassandra/sidecar/utils/CacheFactoryTest.java
index 3dacee89..1a93206d 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/utils/CacheFactoryTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/utils/CacheFactoryTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.sidecar.utils;
 
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -28,18 +30,33 @@ import com.google.common.testing.FakeTicker;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import com.github.benmanes.caffeine.cache.AsyncCache;
 import com.github.benmanes.caffeine.cache.Cache;
 import io.vertx.core.Future;
+import io.vertx.core.MultiMap;
+import io.vertx.ext.auth.User;
+import io.vertx.ext.auth.authorization.AuthorizationContext;
+import org.apache.cassandra.sidecar.acl.authorization.AuthorizationCacheKey;
 import 
org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
+import org.apache.cassandra.sidecar.config.AccessControlConfiguration;
 import org.apache.cassandra.sidecar.config.CacheConfiguration;
 import org.apache.cassandra.sidecar.config.SSTableImportConfiguration;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.config.yaml.AccessControlConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.CacheConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SSTableImportConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.TestServiceConfiguration;
+import org.apache.cassandra.sidecar.exceptions.ConfigurationException;
+import org.apache.cassandra.sidecar.metrics.MetricRegistryFactory;
+import org.apache.cassandra.sidecar.metrics.SidecarMetrics;
+import org.apache.cassandra.sidecar.metrics.SidecarMetricsImpl;
 
+import static 
org.apache.cassandra.sidecar.utils.AuthUtils.CASSANDRA_ROLES_ATTRIBUTE_NAME;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Unit tests for the {@link CacheFactory} class
@@ -47,7 +64,10 @@ import static org.mockito.Mockito.mock;
 class CacheFactoryTest
 {
     static final MillisecondBoundConfiguration 
SSTABLE_IMPORT_EXPIRE_AFTER_ACCESS = MillisecondBoundConfiguration.parse("2h");
+    static final MillisecondBoundConfiguration 
AUTHORIZATION_EXPIRE_AFTER_ACCESS = MillisecondBoundConfiguration.parse("5m");
+    static final MetricRegistryFactory FACTORY = new 
MetricRegistryFactory(CacheFactoryTest.class.getName(), List.of(), List.of());
     static final long SSTABLE_IMPORT_CACHE_MAX_SIZE = 10L;
+    static final long AUTHORIZATION_CACHE_MAX_SIZE = 10L;
     private CacheFactory cacheFactory;
     private FakeTicker fakeTicker;
 
@@ -61,14 +81,29 @@ class CacheFactoryTest
                                                                                
    .maximumSize(SSTABLE_IMPORT_CACHE_MAX_SIZE)
                                                                                
    .build();
 
+        CacheConfiguration authorizationCacheConfiguration = 
CacheConfigurationImpl.builder()
+                                                                               
    .expireAfterAccess(AUTHORIZATION_EXPIRE_AFTER_ACCESS)
+                                                                               
    .maximumSize(AUTHORIZATION_CACHE_MAX_SIZE)
+                                                                               
    .enabled(true)
+                                                                               
    .build();
+
         SSTableImportConfiguration ssTableImportConfiguration =
         new SSTableImportConfigurationImpl(ssTableImportCacheConfiguration);
         ServiceConfiguration serviceConfiguration =
         TestServiceConfiguration.builder()
                                 
.sstableImportConfiguration(ssTableImportConfiguration)
                                 .build();
+        AccessControlConfiguration accessControlConfiguration
+        = AccessControlConfigurationImpl.builder()
+                                        .enabled(true)
+                                        
.permissionCacheConfiguration(authorizationCacheConfiguration)
+                                        .build();
+        SidecarConfiguration sidecarConfiguration = 
mock(SidecarConfiguration.class);
+        
when(sidecarConfiguration.accessControlConfiguration()).thenReturn(accessControlConfiguration);
+        
when(sidecarConfiguration.serviceConfiguration()).thenReturn(serviceConfiguration);
         SSTableImporter mockSSTableImporter = mock(SSTableImporter.class);
-        cacheFactory = new CacheFactory(serviceConfiguration, 
mockSSTableImporter, fakeTicker::read);
+        SidecarMetrics sidecarMetrics = new SidecarMetricsImpl(FACTORY, null);
+        cacheFactory = new CacheFactory(sidecarConfiguration, 
mockSSTableImporter, sidecarMetrics, fakeTicker::read);
     }
 
     @Test
@@ -179,6 +214,102 @@ class CacheFactoryTest
         assertThat(ssTableImportCacheEntry(cache, importOptions, 
mock(Void.class))).isNotSameAs(voidArray[0]);
     }
 
+    @Test
+    void testEndpointAuthorizationCacheExpiration() throws ExecutionException, 
InterruptedException
+    {
+        AsyncCache<AuthorizationCacheKey, Boolean> cache = 
cacheFactory.endpointAuthorizationCache();
+        AuthorizationCacheKey key1
+        = createAuthorizationCacheKey(1, "user1", List.of("role1"), "ks1", 
"tbl1");
+        AuthorizationCacheKey key2
+        = createAuthorizationCacheKey(1, "user1", List.of("role1"), "ks1", 
"tbl1");
+        AuthorizationCacheKey key3
+        = createAuthorizationCacheKey(1, "user1", List.of("role1"), "ks1", 
"tbl1");
+        AuthorizationCacheKey key4
+        = createAuthorizationCacheKey(1, "user1", List.of("role1"), "ks1", 
"tbl1");
+        AuthorizationCacheKey otherKey
+        = createAuthorizationCacheKey(2, "user2", List.of("role2"), "ks2", 
"tbl2");
+
+        Boolean result1 = authorizationCacheEntry(cache, key1, true);
+        Boolean otherResult = authorizationCacheEntry(cache, otherKey, false);
+
+        assertThat(result1).isTrue();
+        assertThat(otherResult).isFalse();
+
+        // advance ticker 2 minutes
+        fakeTicker.advance(2, TimeUnit.MINUTES);
+
+        // should get the same cached result
+        Boolean result2 = authorizationCacheEntry(cache, key2, false);
+        assertThat(result2).isTrue(); // cached value, not the new value
+
+        // advance ticker 1 minutes and 59 seconds (total: 3m 59s)
+        fakeTicker.advance(1, TimeUnit.MINUTES);
+        fakeTicker.advance(59, TimeUnit.SECONDS);
+
+        // should still get the same cached result
+        Boolean result3 = authorizationCacheEntry(cache, key3, false);
+        assertThat(result3).isTrue();
+
+        fakeTicker.advance(10, TimeUnit.MINUTES);
+
+        // should get a new value (cache expired)
+        Boolean result4 = authorizationCacheEntry(cache, key4, false);
+        assertThat(result4).isFalse(); // new value
+    }
+
+    @Test
+    void testEndpointAuthorizationCacheDifferentKeys() throws 
ExecutionException, InterruptedException
+    {
+        AsyncCache<AuthorizationCacheKey, Boolean> cache = 
cacheFactory.endpointAuthorizationCache();
+
+        // Different handler IDs
+        AuthorizationCacheKey key1
+        = createAuthorizationCacheKey(1, "user1", List.of("role1"), "ks1", 
"tbl1");
+        AuthorizationCacheKey key2
+        = createAuthorizationCacheKey(2, "user1", List.of("role1"), "ks1", 
"tbl1");
+
+        Boolean result1 = authorizationCacheEntry(cache, key1, true);
+        Boolean result2 = authorizationCacheEntry(cache, key2, false);
+
+        assertThat(result1).isTrue();
+        assertThat(result2).isFalse(); // different handler ID = different 
cache entry
+    }
+
+    @Test
+    void 
testEndpointAuthorizationCacheThrowsExceptionWhenExpireAfterAccessIsMissing()
+    {
+        CacheConfiguration ssTableImportCacheConfiguration
+        = CacheConfigurationImpl.builder()
+                                
.expireAfterAccess(SSTABLE_IMPORT_EXPIRE_AFTER_ACCESS)
+                                .maximumSize(SSTABLE_IMPORT_CACHE_MAX_SIZE)
+                                .build();
+
+        // Authorization cache configuration WITHOUT expireAfterAccess
+        CacheConfiguration authorizationCacheConfiguration
+        = 
CacheConfigurationImpl.builder().maximumSize(AUTHORIZATION_CACHE_MAX_SIZE).enabled(true).build();
+
+        SSTableImportConfiguration ssTableImportConfiguration =
+        new SSTableImportConfigurationImpl(ssTableImportCacheConfiguration);
+        ServiceConfiguration serviceConfiguration =
+        TestServiceConfiguration.builder()
+                                
.sstableImportConfiguration(ssTableImportConfiguration)
+                                .build();
+        AccessControlConfiguration accessControlConfiguration
+        = AccessControlConfigurationImpl.builder()
+                                        .enabled(true)
+                                        
.permissionCacheConfiguration(authorizationCacheConfiguration)
+                                        .build();
+        SidecarConfiguration sidecarConfiguration = 
mock(SidecarConfiguration.class);
+        
when(sidecarConfiguration.accessControlConfiguration()).thenReturn(accessControlConfiguration);
+        
when(sidecarConfiguration.serviceConfiguration()).thenReturn(serviceConfiguration);
+        SSTableImporter mockSSTableImporter = mock(SSTableImporter.class);
+        SidecarMetrics sidecarMetrics = new SidecarMetricsImpl(FACTORY, null);
+
+        assertThatThrownBy(() -> new CacheFactory(sidecarConfiguration, 
mockSSTableImporter, sidecarMetrics, fakeTicker::read))
+        .isInstanceOf(ConfigurationException.class)
+        .hasMessageContaining("Authorization handler cache must be configured 
with expireAfterAccess");
+    }
+
     private Void ssTableImportCacheEntry(Cache<SSTableImporter.ImportOptions, 
Future<Void>> cache,
                                          SSTableImporter.ImportOptions key, 
Void value)
     throws ExecutionException, InterruptedException
@@ -198,4 +329,26 @@ class CacheFactoryTest
                .host("localhost")
                .build();
     }
+
+    private AuthorizationCacheKey createAuthorizationCacheKey(int handlerId, 
String username, List<String> roles,
+                                                              String keyspace, 
String table)
+    {
+        User user = User.fromName(username);
+        user.attributes().put(CASSANDRA_ROLES_ATTRIBUTE_NAME, roles);
+        MultiMap variables = MultiMap.caseInsensitiveMultiMap()
+                                     .add("keyspace", keyspace)
+                                     .add("table", table);
+        AuthorizationContext authContext = AuthorizationContext.create(user);
+        variables.forEach(entry -> authContext.variables().add(entry.getKey(), 
entry.getValue()));
+        return AuthorizationCacheKey.create(handlerId, authContext);
+    }
+
+    private Boolean authorizationCacheEntry(AsyncCache<AuthorizationCacheKey, 
Boolean> cache,
+                                            AuthorizationCacheKey key,
+                                            Boolean value) throws 
ExecutionException, InterruptedException
+    {
+        CompletableFuture<Boolean> future = cache.get(key, k -> value);
+        assertThat(future).isNotNull();
+        return future.get();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to