This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b11e00c2 [hotfix] Add authentication to the admin APIs that currently 
lack it (#2350)
8b11e00c2 is described below

commit 8b11e00c239749c74bda8d243da09fb9fab5b893
Author: yunhong <[email protected]>
AuthorDate: Mon Jan 12 19:32:12 2026 +0800

    [hotfix] Add authentication to the admin APIs that currently lack it (#2350)
---
 .../org/apache/fluss/client/admin/FlussAdmin.java  |  11 ++
 .../security/acl/FlussAuthorizationITCase.java     | 117 +++++++++++++++++++--
 .../security/acl/FlinkAuthorizationITCase.java     |   5 +-
 .../org/apache/fluss/server/RpcServiceBase.java    |  48 ++++++---
 .../server/coordinator/CoordinatorService.java     |  67 ++++++------
 .../apache/fluss/server/tablet/TabletService.java  |  13 +--
 6 files changed, 191 insertions(+), 70 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java 
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 63b1347bd..88aec06c8 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.client.admin;
 
+import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.client.metadata.KvSnapshotMetadata;
 import org.apache.fluss.client.metadata.KvSnapshots;
 import org.apache.fluss.client.metadata.LakeSnapshot;
@@ -649,4 +650,14 @@ public class FlussAdmin implements Admin {
                     }
                 });
     }
+
+    @VisibleForTesting
+    public AdminGateway getAdminGateway() {
+        return gateway;
+    }
+
+    @VisibleForTesting
+    public AdminReadOnlyGateway getAdminReadOnlyGateway() {
+        return readOnlyGateway;
+    }
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
index 947d38f25..ba1fb7fe2 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
@@ -21,6 +21,7 @@ import org.apache.fluss.client.Connection;
 import org.apache.fluss.client.ConnectionFactory;
 import org.apache.fluss.client.FlussConnection;
 import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.FlussAdmin;
 import org.apache.fluss.client.table.Table;
 import org.apache.fluss.client.table.scanner.batch.BatchScanner;
 import org.apache.fluss.client.table.writer.AppendWriter;
@@ -33,6 +34,9 @@ import org.apache.fluss.config.cluster.AlterConfig;
 import org.apache.fluss.config.cluster.AlterConfigOpType;
 import org.apache.fluss.config.cluster.ConfigEntry;
 import org.apache.fluss.exception.AuthorizationException;
+import org.apache.fluss.exception.KvSnapshotNotExistException;
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
+import org.apache.fluss.exception.TableNotPartitionedException;
 import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.TableBucket;
@@ -43,9 +47,11 @@ import org.apache.fluss.row.InternalRow;
 import org.apache.fluss.rpc.GatewayClientProxy;
 import org.apache.fluss.rpc.RpcClient;
 import org.apache.fluss.rpc.gateway.AdminGateway;
+import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
 import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
+import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
 import org.apache.fluss.rpc.messages.InitWriterRequest;
 import org.apache.fluss.rpc.messages.InitWriterResponse;
 import org.apache.fluss.rpc.messages.MetadataRequest;
@@ -60,13 +66,18 @@ import org.apache.fluss.security.acl.PermissionType;
 import org.apache.fluss.security.acl.Resource;
 import org.apache.fluss.security.acl.ResourceFilter;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.TableRegistration;
 import org.apache.fluss.shaded.guava32.com.google.common.collect.Lists;
 import org.apache.fluss.utils.CloseableIterator;
 
+import org.assertj.core.api.ThrowableAssert;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.time.Duration;
 import java.util.Arrays;
@@ -77,8 +88,10 @@ import java.util.concurrent.ExecutionException;
 
 import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
 import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
 import static org.apache.fluss.security.acl.AccessControlEntry.WILD_CARD_HOST;
@@ -390,10 +403,26 @@ public class FlussAuthorizationITCase {
     }
 
     @Test
-    void testListTables() throws Exception {
+    void testDescribeTableOperation() throws Exception {
+        // test describe table operations like:
+        // 1. listTables
+        // 2. getTableInfo
+        // 3. getTableSchema
+        // 4. getLatestKvSnapshots
+        // 5. listPartitionInfos
+        // 6. getLatestLakeSnapshot
+
+        // first check call these methods without authorization.
         
assertThat(guestAdmin.listTables(DATA1_TABLE_PATH_PK.getDatabaseName()).get())
                 .isEqualTo(Collections.emptyList());
-
+        assertNoTableDescribeAuth(() -> 
guestAdmin.getTableInfo(DATA1_TABLE_PATH_PK).get());
+        assertNoTableDescribeAuth(() -> 
guestAdmin.getTableSchema(DATA1_TABLE_PATH_PK).get());
+        assertNoTableDescribeAuth(() -> 
guestAdmin.getLatestKvSnapshots(DATA1_TABLE_PATH_PK).get());
+        assertNoTableDescribeAuth(() -> 
guestAdmin.listPartitionInfos(DATA1_TABLE_PATH_PK).get());
+        assertNoTableDescribeAuth(
+                () -> 
guestAdmin.getLatestLakeSnapshot(DATA1_TABLE_PATH_PK).get());
+
+        // add acl to allow guest describe table resource
         List<AclBinding> aclBindings =
                 Collections.singletonList(
                         new AclBinding(
@@ -405,8 +434,70 @@ public class FlussAuthorizationITCase {
                                         PermissionType.ALLOW)));
         rootAdmin.createAcls(aclBindings).all().get();
         FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
+
+        // check call these methods with authorization.
         
assertThat(guestAdmin.listTables(DATA1_TABLE_PATH_PK.getDatabaseName()).get())
                 
.isEqualTo(Collections.singletonList(DATA1_TABLE_PATH_PK.getTableName()));
+        
assertThat(guestAdmin.getTableInfo(DATA1_TABLE_PATH_PK).get().getTablePath())
+                .isEqualTo(DATA1_TABLE_INFO_PK.getTablePath());
+        
assertThat(guestAdmin.getTableSchema(DATA1_TABLE_PATH_PK).get().getSchema())
+                .isEqualTo(DATA1_SCHEMA_PK);
+        assertThat(guestAdmin.tableExists(DATA1_TABLE_PATH_PK).get()).isTrue();
+        
assertThat(guestAdmin.getLatestKvSnapshots(DATA1_TABLE_PATH_PK).get().getBucketIds())
+                .containsExactlyInAnyOrder(0, 1, 2);
+        assertThatThrownBy(() -> 
guestAdmin.listPartitionInfos(DATA1_TABLE_PATH_PK).get())
+                .rootCause()
+                .isInstanceOf(TableNotPartitionedException.class)
+                .hasMessageContaining(
+                        "Table 'test_db_1.test_pk_table_1' is not a 
partitioned table.");
+        assertThatThrownBy(() -> 
guestAdmin.getLatestLakeSnapshot(DATA1_TABLE_PATH_PK).get())
+                .rootCause()
+                .isInstanceOf(LakeTableSnapshotNotExistException.class)
+                .hasMessageContaining("Lake table snapshot not exist for 
table");
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"CoordinatorServer", "TabletServer"})
+    void testGetKvSnapshotMetadata(String serverType) throws Exception {
+        AdminReadOnlyGateway readOnlyGateway;
+        if (serverType.equals("CoordinatorServer")) {
+            readOnlyGateway = ((FlussAdmin) guestAdmin).getAdminGateway();
+        } else {
+            readOnlyGateway = ((FlussAdmin) 
guestAdmin).getAdminReadOnlyGateway();
+        }
+
+        ZooKeeperClient zooKeeperClient = 
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+        TableRegistration tableRegistration = 
zooKeeperClient.getTable(DATA1_TABLE_PATH_PK).get();
+        long tableId = tableRegistration.tableId;
+        FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
+
+        GetKvSnapshotMetadataRequest request = new 
GetKvSnapshotMetadataRequest();
+        request.setTableId(tableId).setBucketId(0).setSnapshotId(0);
+        // Make sure all tabletServer has ready replica and ready metadata for 
the table.
+        FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(new 
TableBucket(tableId, 0));
+
+        // call getKvSnapshotMetadata without authorization.
+        assertNoTableDescribeAuth(() -> 
readOnlyGateway.getKvSnapshotMetadata(request).get());
+
+        // add acl to allow guest describe table resource
+        List<AclBinding> aclBindings =
+                Collections.singletonList(
+                        new AclBinding(
+                                
Resource.database(DATA1_TABLE_PATH_PK.getDatabaseName()),
+                                new AccessControlEntry(
+                                        guestPrincipal,
+                                        "*",
+                                        OperationType.DESCRIBE,
+                                        PermissionType.ALLOW)));
+        rootAdmin.createAcls(aclBindings).all().get();
+        FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
+
+        // call getKvSnapshotMetadata with authorization. no authorization 
exception should be
+        // thrown.
+        assertThatThrownBy(() -> 
readOnlyGateway.getKvSnapshotMetadata(request).get())
+                .rootCause()
+                .isInstanceOf(KvSnapshotNotExistException.class)
+                .hasMessageContaining("Failed to get kv snapshot metadata for 
table bucket");
     }
 
     @Test
@@ -572,9 +663,9 @@ public class FlussAuthorizationITCase {
                     .rootCause()
                     .hasMessageContaining(
                             String.format(
-                                    "No permission to WRITE table %s in 
database %s",
-                                    noWriteAclTable.getTableName(),
-                                    noWriteAclTable.getDatabaseName()));
+                                    "Principal FlussPrincipal{name='guest', 
type='User'} have no authorization to "
+                                            + "operate WRITE on resource 
Resource{type=TABLE, name='%s'} ",
+                                    noWriteAclTable));
         }
     }
 
@@ -609,10 +700,9 @@ public class FlussAuthorizationITCase {
                 assertThatThrownBy(() -> 
batchScanner.pollBatch(Duration.ofMinutes(1)))
                         .hasMessageContaining(
                                 String.format(
-                                        "No permission to %s table %s in 
database %s",
-                                        READ,
-                                        DATA1_TABLE_PATH.getTableName(),
-                                        DATA1_TABLE_PATH.getDatabaseName()));
+                                        "Principal 
FlussPrincipal{name='guest', type='User'} have no authorization to "
+                                                + "operate %s on resource 
Resource{type=TABLE, name='%s'}",
+                                        READ, DATA1_TABLE_PATH));
             }
             rootAdmin
                     .createAcls(
@@ -955,4 +1045,13 @@ public class FlussAuthorizationITCase {
         conf.set(ConfigOptions.AUTHORIZER_ENABLED, true);
         return conf;
     }
+
+    private void assertNoTableDescribeAuth(ThrowableAssert.ThrowingCallable 
callable) {
+        assertThatThrownBy(callable)
+                .cause()
+                .isInstanceOf(AuthorizationException.class)
+                .hasMessageContaining(
+                        "Principal FlussPrincipal{name='guest', type='User'} 
have no authorization to "
+                                + "operate DESCRIBE on resource 
Resource{type=TABLE, name='test_db_1.test_pk_table_1'}");
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
index 05cbd7802..96e2ed199 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
@@ -383,8 +383,9 @@ abstract class FlinkAuthorizationITCase extends 
AbstractTestBase {
                 .rootCause()
                 .hasMessageContaining(
                         String.format(
-                                "No permission to READ table %s in database 
%s",
-                                tablePath.getTableName(), 
tablePath.getDatabaseName()));
+                                "Principal FlussPrincipal{name='guest', 
type='User'} have no authorization to "
+                                        + "operate READ on resource 
Resource{type=TABLE, name='%s'} ",
+                                tablePath));
         addAcl(Resource.database(tablePath.getDatabaseName()), READ);
         assertQueryResultExactOrder(
                 tBatchEnv,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java 
b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
index b12bb787c..9e2bcbf16 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
@@ -170,6 +170,20 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
         return provider;
     }
 
+    public abstract void authorizeTable(OperationType operationType, long 
tableId);
+
+    public void authorizeDatabase(OperationType operationType, String 
databaseName) {
+        if (authorizer != null) {
+            authorizer.authorize(currentSession(), operationType, 
Resource.database(databaseName));
+        }
+    }
+
+    public void authorizeTable(OperationType operationType, TablePath 
tablePath) {
+        if (authorizer != null) {
+            authorizer.authorize(currentSession(), operationType, 
Resource.table(tablePath));
+        }
+    }
+
     @Override
     public CompletableFuture<ApiVersionsResponse> 
apiVersions(ApiVersionsRequest request) {
         Set<ApiKeys> apiKeys = apiManager.enabledApis();
@@ -210,15 +224,11 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
     @Override
     public CompletableFuture<GetDatabaseInfoResponse> getDatabaseInfo(
             GetDatabaseInfoRequest request) {
-        if (authorizer != null) {
-            authorizer.authorize(
-                    currentSession(),
-                    OperationType.DESCRIBE,
-                    Resource.database(request.getDatabaseName()));
-        }
+        String databaseName = request.getDatabaseName();
+        authorizeDatabase(OperationType.DESCRIBE, databaseName);
 
         GetDatabaseInfoResponse response = new GetDatabaseInfoResponse();
-        DatabaseInfo databaseInfo = 
metadataManager.getDatabase(request.getDatabaseName());
+        DatabaseInfo databaseInfo = metadataManager.getDatabase(databaseName);
         
response.setDatabaseJson(databaseInfo.getDatabaseDescriptor().toJsonBytes())
                 .setCreatedTime(databaseInfo.getCreatedTime())
                 .setModifiedTime(databaseInfo.getModifiedTime());
@@ -227,6 +237,7 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
 
     @Override
     public CompletableFuture<DatabaseExistsResponse> 
databaseExists(DatabaseExistsRequest request) {
+        // By design: database exists not need to check database authorization.
         DatabaseExistsResponse response = new DatabaseExistsResponse();
         boolean exists = 
metadataManager.databaseExists(request.getDatabaseName());
         response.setExists(exists);
@@ -258,12 +269,7 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
     @Override
     public CompletableFuture<GetTableInfoResponse> 
getTableInfo(GetTableInfoRequest request) {
         TablePath tablePath = toTablePath(request.getTablePath());
-        if (authorizer != null) {
-            authorizer.authorize(
-                    currentSession(),
-                    OperationType.DESCRIBE,
-                    Resource.table(tablePath.getDatabaseName(), 
tablePath.getTableName()));
-        }
+        authorizeTable(OperationType.DESCRIBE, tablePath);
 
         GetTableInfoResponse response = new GetTableInfoResponse();
         TableInfo tableInfo = metadataManager.getTable(tablePath);
@@ -278,6 +284,8 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
     @Override
     public CompletableFuture<GetTableSchemaResponse> 
getTableSchema(GetTableSchemaRequest request) {
         TablePath tablePath = toTablePath(request.getTablePath());
+        authorizeTable(OperationType.DESCRIBE, tablePath);
+
         final SchemaInfo schemaInfo;
         if (request.hasSchemaId()) {
             schemaInfo = metadataManager.getSchemaById(tablePath, 
request.getSchemaId());
@@ -292,6 +300,7 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
 
     @Override
     public CompletableFuture<TableExistsResponse> 
tableExists(TableExistsRequest request) {
+        // By design: table exists not need to check table authorization.
         TableExistsResponse response = new TableExistsResponse();
         boolean exists = 
metadataManager.tableExists(toTablePath(request.getTablePath()));
         response.setExists(exists);
@@ -302,6 +311,8 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
     public CompletableFuture<GetLatestKvSnapshotsResponse> 
getLatestKvSnapshots(
             GetLatestKvSnapshotsRequest request) {
         TablePath tablePath = toTablePath(request.getTablePath());
+        authorizeTable(OperationType.DESCRIBE, tablePath);
+
         // get table info
         TableInfo tableInfo = metadataManager.getTable(tablePath);
 
@@ -363,9 +374,12 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
     @Override
     public CompletableFuture<GetKvSnapshotMetadataResponse> 
getKvSnapshotMetadata(
             GetKvSnapshotMetadataRequest request) {
+        long tableId = request.getTableId();
+        authorizeTable(OperationType.DESCRIBE, tableId);
+
         TableBucket tableBucket =
                 new TableBucket(
-                        request.getTableId(),
+                        tableId,
                         request.hasPartitionId() ? request.getPartitionId() : 
null,
                         request.getBucketId());
         long snapshotId = request.getSnapshotId();
@@ -412,6 +426,8 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
     public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
             ListPartitionInfosRequest request) {
         TablePath tablePath = toTablePath(request.getTablePath());
+        authorizeTable(OperationType.DESCRIBE, tablePath);
+
         Map<String, Long> partitionNameAndIds;
         if (request.hasPartialPartitionSpec()) {
             ResolvedPartitionSpec partitionSpecFromRequest =
@@ -430,8 +446,10 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
     @Override
     public CompletableFuture<GetLatestLakeSnapshotResponse> 
getLatestLakeSnapshot(
             GetLatestLakeSnapshotRequest request) {
-        // get table info
         TablePath tablePath = toTablePath(request.getTablePath());
+        authorizeTable(OperationType.DESCRIBE, tablePath);
+
+        // get table info
         TableInfo tableInfo = metadataManager.getTable(tablePath);
         // get table id
         long tableId = tableInfo.getTableId();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 6f60c0c9c..70b359e4d 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -34,6 +34,8 @@ import 
org.apache.fluss.exception.LakeTableAlreadyExistException;
 import org.apache.fluss.exception.SecurityDisabledException;
 import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.exception.TableNotPartitionedException;
+import org.apache.fluss.exception.UnknownServerException;
+import org.apache.fluss.exception.UnknownTableOrBucketException;
 import org.apache.fluss.fs.FileSystem;
 import org.apache.fluss.fs.FsPath;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
@@ -236,6 +238,31 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         IOUtils.closeQuietly(lakeCatalogDynamicLoader, "lake catalog");
     }
 
+    @Override
+    public void authorizeTable(OperationType operationType, long tableId) {
+        if (authorizer != null) {
+            TablePath tablePath;
+            try {
+                AccessContextEvent<TablePath> getTablePathEvent =
+                        new AccessContextEvent<>(ctx -> 
ctx.getTablePathById(tableId));
+                eventManagerSupplier.get().put(getTablePathEvent);
+                tablePath = getTablePathEvent.getResultFuture().get();
+            } catch (Exception e) {
+                throw new UnknownServerException("Failed to get table path by 
ID " + tableId, e);
+            }
+
+            if (tablePath == null) {
+                throw new UnknownTableOrBucketException(
+                        String.format(
+                                "This server %s does not know this table ID 
%s. This may happen when the table "
+                                        + "metadata cache in the server is not 
updated yet.",
+                                name(), tableId));
+            }
+
+            authorizeTable(operationType, tablePath);
+        }
+    }
+
     @Override
     public CompletableFuture<CreateDatabaseResponse> 
createDatabase(CreateDatabaseRequest request) {
         if (authorizer != null) {
@@ -262,13 +289,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
 
     @Override
     public CompletableFuture<DropDatabaseResponse> 
dropDatabase(DropDatabaseRequest request) {
-        if (authorizer != null) {
-            authorizer.authorize(
-                    currentSession(),
-                    OperationType.DROP,
-                    Resource.database(request.getDatabaseName()));
-        }
-
+        authorizeDatabase(OperationType.DROP, request.getDatabaseName());
         DropDatabaseResponse response = new DropDatabaseResponse();
         metadataManager.dropDatabase(
                 request.getDatabaseName(), request.isIgnoreIfNotExists(), 
request.isCascade());
@@ -279,12 +300,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
     public CompletableFuture<CreateTableResponse> 
createTable(CreateTableRequest request) {
         TablePath tablePath = toTablePath(request.getTablePath());
         tablePath.validate();
-        if (authorizer != null) {
-            authorizer.authorize(
-                    currentSession(),
-                    OperationType.CREATE,
-                    Resource.database(tablePath.getDatabaseName()));
-        }
+        authorizeDatabase(OperationType.CREATE, tablePath.getDatabaseName());
 
         TableDescriptor tableDescriptor;
         try {
@@ -348,9 +364,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
     public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest 
request) {
         TablePath tablePath = toTablePath(request.getTablePath());
         tablePath.validate();
-        if (authorizer != null) {
-            authorizer.authorize(currentSession(), OperationType.ALTER, 
Resource.table(tablePath));
-        }
+        authorizeTable(OperationType.ALTER, tablePath);
 
         List<TableChange> alterTableConfigChanges =
                 toAlterTableConfigChanges(request.getConfigChangesList());
@@ -495,12 +509,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
     @Override
     public CompletableFuture<DropTableResponse> dropTable(DropTableRequest 
request) {
         TablePath tablePath = toTablePath(request.getTablePath());
-        if (authorizer != null) {
-            authorizer.authorize(
-                    currentSession(),
-                    OperationType.DROP,
-                    Resource.table(tablePath.getDatabaseName(), 
tablePath.getTableName()));
-        }
+        authorizeTable(OperationType.DROP, tablePath);
 
         DropTableResponse response = new DropTableResponse();
         metadataManager.dropTable(tablePath, request.isIgnoreIfNotExists());
@@ -511,12 +520,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
     public CompletableFuture<CreatePartitionResponse> createPartition(
             CreatePartitionRequest request) {
         TablePath tablePath = toTablePath(request.getTablePath());
-        if (authorizer != null) {
-            authorizer.authorize(
-                    currentSession(),
-                    OperationType.WRITE,
-                    Resource.table(tablePath.getDatabaseName(), 
tablePath.getTableName()));
-        }
+        authorizeTable(OperationType.WRITE, tablePath);
 
         CreatePartitionResponse response = new CreatePartitionResponse();
         TableRegistration table = 
metadataManager.getTableRegistration(tablePath);
@@ -552,12 +556,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
     @Override
     public CompletableFuture<DropPartitionResponse> 
dropPartition(DropPartitionRequest request) {
         TablePath tablePath = toTablePath(request.getTablePath());
-        if (authorizer != null) {
-            authorizer.authorize(
-                    currentSession(),
-                    OperationType.WRITE,
-                    Resource.table(tablePath.getDatabaseName(), 
tablePath.getTableName()));
-        }
+        authorizeTable(OperationType.WRITE, tablePath);
 
         DropPartitionResponse response = new DropPartitionResponse();
         TableRegistration table = 
metadataManager.getTableRegistration(tablePath);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
index 9a47b6383..992b96333 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
@@ -386,7 +386,8 @@ public final class TabletService extends RpcServiceBase 
implements TabletServerG
         return response;
     }
 
-    private void authorizeTable(OperationType operationType, long tableId) {
+    @Override
+    public void authorizeTable(OperationType operationType, long tableId) {
         if (authorizer != null) {
             TablePath tablePath = 
metadataCache.getTablePath(tableId).orElse(null);
             if (tablePath == null) {
@@ -396,15 +397,7 @@ public final class TabletService extends RpcServiceBase 
implements TabletServerG
                                         + "metadata cache in the server is not 
updated yet.",
                                 serviceName, tableId));
             }
-            if (!authorizer.isAuthorized(
-                    currentSession(), operationType, 
Resource.table(tablePath))) {
-                throw new AuthorizationException(
-                        String.format(
-                                "No permission to %s table %s in database %s",
-                                operationType,
-                                tablePath.getTableName(),
-                                tablePath.getDatabaseName()));
-            }
+            authorizeTable(operationType, tablePath);
         }
     }
 

Reply via email to