This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 8b11e00c2 [hotfix] Add authentication to the admin APIs that currently
lack it (#2350)
8b11e00c2 is described below
commit 8b11e00c239749c74bda8d243da09fb9fab5b893
Author: yunhong <[email protected]>
AuthorDate: Mon Jan 12 19:32:12 2026 +0800
[hotfix] Add authentication to the admin APIs that currently lack it (#2350)
---
.../org/apache/fluss/client/admin/FlussAdmin.java | 11 ++
.../security/acl/FlussAuthorizationITCase.java | 117 +++++++++++++++++++--
.../security/acl/FlinkAuthorizationITCase.java | 5 +-
.../org/apache/fluss/server/RpcServiceBase.java | 48 ++++++---
.../server/coordinator/CoordinatorService.java | 67 ++++++------
.../apache/fluss/server/tablet/TabletService.java | 13 +--
6 files changed, 191 insertions(+), 70 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 63b1347bd..88aec06c8 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -17,6 +17,7 @@
package org.apache.fluss.client.admin;
+import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.client.metadata.KvSnapshotMetadata;
import org.apache.fluss.client.metadata.KvSnapshots;
import org.apache.fluss.client.metadata.LakeSnapshot;
@@ -649,4 +650,14 @@ public class FlussAdmin implements Admin {
}
});
}
+
+ @VisibleForTesting
+ public AdminGateway getAdminGateway() {
+ return gateway;
+ }
+
+ @VisibleForTesting
+ public AdminReadOnlyGateway getAdminReadOnlyGateway() {
+ return readOnlyGateway;
+ }
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
index 947d38f25..ba1fb7fe2 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java
@@ -21,6 +21,7 @@ import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.FlussConnection;
import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.admin.FlussAdmin;
import org.apache.fluss.client.table.Table;
import org.apache.fluss.client.table.scanner.batch.BatchScanner;
import org.apache.fluss.client.table.writer.AppendWriter;
@@ -33,6 +34,9 @@ import org.apache.fluss.config.cluster.AlterConfig;
import org.apache.fluss.config.cluster.AlterConfigOpType;
import org.apache.fluss.config.cluster.ConfigEntry;
import org.apache.fluss.exception.AuthorizationException;
+import org.apache.fluss.exception.KvSnapshotNotExistException;
+import org.apache.fluss.exception.LakeTableSnapshotNotExistException;
+import org.apache.fluss.exception.TableNotPartitionedException;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metadata.TableBucket;
@@ -43,9 +47,11 @@ import org.apache.fluss.row.InternalRow;
import org.apache.fluss.rpc.GatewayClientProxy;
import org.apache.fluss.rpc.RpcClient;
import org.apache.fluss.rpc.gateway.AdminGateway;
+import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
+import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest;
import org.apache.fluss.rpc.messages.InitWriterRequest;
import org.apache.fluss.rpc.messages.InitWriterResponse;
import org.apache.fluss.rpc.messages.MetadataRequest;
@@ -60,13 +66,18 @@ import org.apache.fluss.security.acl.PermissionType;
import org.apache.fluss.security.acl.Resource;
import org.apache.fluss.security.acl.ResourceFilter;
import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import org.apache.fluss.server.zk.data.TableRegistration;
import org.apache.fluss.shaded.guava32.com.google.common.collect.Lists;
import org.apache.fluss.utils.CloseableIterator;
+import org.assertj.core.api.ThrowableAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.time.Duration;
import java.util.Arrays;
@@ -77,8 +88,10 @@ import java.util.concurrent.ExecutionException;
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
import static org.apache.fluss.record.TestData.DATA1_SCHEMA;
+import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
import static org.apache.fluss.security.acl.AccessControlEntry.WILD_CARD_HOST;
@@ -390,10 +403,26 @@ public class FlussAuthorizationITCase {
}
@Test
- void testListTables() throws Exception {
+ void testDescribeTableOperation() throws Exception {
+ // test describe table operations like:
+ // 1. listTables
+ // 2. getTableInfo
+ // 3. getTableSchema
+ // 4. getLatestKvSnapshots
+ // 5. listPartitionInfos
+ // 6. getLatestLakeSnapshot
+
+ // first check call these methods without authorization.
assertThat(guestAdmin.listTables(DATA1_TABLE_PATH_PK.getDatabaseName()).get())
.isEqualTo(Collections.emptyList());
-
+ assertNoTableDescribeAuth(() ->
guestAdmin.getTableInfo(DATA1_TABLE_PATH_PK).get());
+ assertNoTableDescribeAuth(() ->
guestAdmin.getTableSchema(DATA1_TABLE_PATH_PK).get());
+ assertNoTableDescribeAuth(() ->
guestAdmin.getLatestKvSnapshots(DATA1_TABLE_PATH_PK).get());
+ assertNoTableDescribeAuth(() ->
guestAdmin.listPartitionInfos(DATA1_TABLE_PATH_PK).get());
+ assertNoTableDescribeAuth(
+ () ->
guestAdmin.getLatestLakeSnapshot(DATA1_TABLE_PATH_PK).get());
+
+ // add acl to allow guest describe table resource
List<AclBinding> aclBindings =
Collections.singletonList(
new AclBinding(
@@ -405,8 +434,70 @@ public class FlussAuthorizationITCase {
PermissionType.ALLOW)));
rootAdmin.createAcls(aclBindings).all().get();
FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
+
+ // check call these methods with authorization.
assertThat(guestAdmin.listTables(DATA1_TABLE_PATH_PK.getDatabaseName()).get())
.isEqualTo(Collections.singletonList(DATA1_TABLE_PATH_PK.getTableName()));
+
assertThat(guestAdmin.getTableInfo(DATA1_TABLE_PATH_PK).get().getTablePath())
+ .isEqualTo(DATA1_TABLE_INFO_PK.getTablePath());
+
assertThat(guestAdmin.getTableSchema(DATA1_TABLE_PATH_PK).get().getSchema())
+ .isEqualTo(DATA1_SCHEMA_PK);
+ assertThat(guestAdmin.tableExists(DATA1_TABLE_PATH_PK).get()).isTrue();
+
assertThat(guestAdmin.getLatestKvSnapshots(DATA1_TABLE_PATH_PK).get().getBucketIds())
+ .containsExactlyInAnyOrder(0, 1, 2);
+ assertThatThrownBy(() ->
guestAdmin.listPartitionInfos(DATA1_TABLE_PATH_PK).get())
+ .rootCause()
+ .isInstanceOf(TableNotPartitionedException.class)
+ .hasMessageContaining(
+ "Table 'test_db_1.test_pk_table_1' is not a
partitioned table.");
+ assertThatThrownBy(() ->
guestAdmin.getLatestLakeSnapshot(DATA1_TABLE_PATH_PK).get())
+ .rootCause()
+ .isInstanceOf(LakeTableSnapshotNotExistException.class)
+ .hasMessageContaining("Lake table snapshot not exist for
table");
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"CoordinatorServer", "TabletServer"})
+ void testGetKvSnapshotMetadata(String serverType) throws Exception {
+ AdminReadOnlyGateway readOnlyGateway;
+ if (serverType.equals("CoordinatorServer")) {
+ readOnlyGateway = ((FlussAdmin) guestAdmin).getAdminGateway();
+ } else {
+ readOnlyGateway = ((FlussAdmin)
guestAdmin).getAdminReadOnlyGateway();
+ }
+
+ ZooKeeperClient zooKeeperClient =
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+ TableRegistration tableRegistration =
zooKeeperClient.getTable(DATA1_TABLE_PATH_PK).get();
+ long tableId = tableRegistration.tableId;
+ FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
+
+ GetKvSnapshotMetadataRequest request = new
GetKvSnapshotMetadataRequest();
+ request.setTableId(tableId).setBucketId(0).setSnapshotId(0);
+ // Make sure all tabletServer has ready replica and ready metadata for
the table.
+ FLUSS_CLUSTER_EXTENSION.waitUntilAllReplicaReady(new
TableBucket(tableId, 0));
+
+ // call getKvSnapshotMetadata without authorization.
+ assertNoTableDescribeAuth(() ->
readOnlyGateway.getKvSnapshotMetadata(request).get());
+
+ // add acl to allow guest describe table resource
+ List<AclBinding> aclBindings =
+ Collections.singletonList(
+ new AclBinding(
+
Resource.database(DATA1_TABLE_PATH_PK.getDatabaseName()),
+ new AccessControlEntry(
+ guestPrincipal,
+ "*",
+ OperationType.DESCRIBE,
+ PermissionType.ALLOW)));
+ rootAdmin.createAcls(aclBindings).all().get();
+ FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true);
+
+ // call getKvSnapshotMetadata with authorization. no authorization
exception should be
+ // thrown.
+ assertThatThrownBy(() ->
readOnlyGateway.getKvSnapshotMetadata(request).get())
+ .rootCause()
+ .isInstanceOf(KvSnapshotNotExistException.class)
+ .hasMessageContaining("Failed to get kv snapshot metadata for
table bucket");
}
@Test
@@ -572,9 +663,9 @@ public class FlussAuthorizationITCase {
.rootCause()
.hasMessageContaining(
String.format(
- "No permission to WRITE table %s in
database %s",
- noWriteAclTable.getTableName(),
- noWriteAclTable.getDatabaseName()));
+ "Principal FlussPrincipal{name='guest',
type='User'} have no authorization to "
+ + "operate WRITE on resource
Resource{type=TABLE, name='%s'} ",
+ noWriteAclTable));
}
}
@@ -609,10 +700,9 @@ public class FlussAuthorizationITCase {
assertThatThrownBy(() ->
batchScanner.pollBatch(Duration.ofMinutes(1)))
.hasMessageContaining(
String.format(
- "No permission to %s table %s in
database %s",
- READ,
- DATA1_TABLE_PATH.getTableName(),
- DATA1_TABLE_PATH.getDatabaseName()));
+ "Principal
FlussPrincipal{name='guest', type='User'} have no authorization to "
+ + "operate %s on resource
Resource{type=TABLE, name='%s'}",
+ READ, DATA1_TABLE_PATH));
}
rootAdmin
.createAcls(
@@ -955,4 +1045,13 @@ public class FlussAuthorizationITCase {
conf.set(ConfigOptions.AUTHORIZER_ENABLED, true);
return conf;
}
+
+ private void assertNoTableDescribeAuth(ThrowableAssert.ThrowingCallable
callable) {
+ assertThatThrownBy(callable)
+ .cause()
+ .isInstanceOf(AuthorizationException.class)
+ .hasMessageContaining(
+ "Principal FlussPrincipal{name='guest', type='User'}
have no authorization to "
+ + "operate DESCRIBE on resource
Resource{type=TABLE, name='test_db_1.test_pk_table_1'}");
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
index 05cbd7802..96e2ed199 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java
@@ -383,8 +383,9 @@ abstract class FlinkAuthorizationITCase extends
AbstractTestBase {
.rootCause()
.hasMessageContaining(
String.format(
- "No permission to READ table %s in database
%s",
- tablePath.getTableName(),
tablePath.getDatabaseName()));
+ "Principal FlussPrincipal{name='guest',
type='User'} have no authorization to "
+ + "operate READ on resource
Resource{type=TABLE, name='%s'} ",
+ tablePath));
addAcl(Resource.database(tablePath.getDatabaseName()), READ);
assertQueryResultExactOrder(
tBatchEnv,
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
index b12bb787c..9e2bcbf16 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java
@@ -170,6 +170,20 @@ public abstract class RpcServiceBase extends
RpcGatewayService implements AdminR
return provider;
}
+ public abstract void authorizeTable(OperationType operationType, long
tableId);
+
+ public void authorizeDatabase(OperationType operationType, String
databaseName) {
+ if (authorizer != null) {
+ authorizer.authorize(currentSession(), operationType,
Resource.database(databaseName));
+ }
+ }
+
+ public void authorizeTable(OperationType operationType, TablePath
tablePath) {
+ if (authorizer != null) {
+ authorizer.authorize(currentSession(), operationType,
Resource.table(tablePath));
+ }
+ }
+
@Override
public CompletableFuture<ApiVersionsResponse>
apiVersions(ApiVersionsRequest request) {
Set<ApiKeys> apiKeys = apiManager.enabledApis();
@@ -210,15 +224,11 @@ public abstract class RpcServiceBase extends
RpcGatewayService implements AdminR
@Override
public CompletableFuture<GetDatabaseInfoResponse> getDatabaseInfo(
GetDatabaseInfoRequest request) {
- if (authorizer != null) {
- authorizer.authorize(
- currentSession(),
- OperationType.DESCRIBE,
- Resource.database(request.getDatabaseName()));
- }
+ String databaseName = request.getDatabaseName();
+ authorizeDatabase(OperationType.DESCRIBE, databaseName);
GetDatabaseInfoResponse response = new GetDatabaseInfoResponse();
- DatabaseInfo databaseInfo =
metadataManager.getDatabase(request.getDatabaseName());
+ DatabaseInfo databaseInfo = metadataManager.getDatabase(databaseName);
response.setDatabaseJson(databaseInfo.getDatabaseDescriptor().toJsonBytes())
.setCreatedTime(databaseInfo.getCreatedTime())
.setModifiedTime(databaseInfo.getModifiedTime());
@@ -227,6 +237,7 @@ public abstract class RpcServiceBase extends
RpcGatewayService implements AdminR
@Override
public CompletableFuture<DatabaseExistsResponse>
databaseExists(DatabaseExistsRequest request) {
+ // By design: database exists not need to check database authorization.
DatabaseExistsResponse response = new DatabaseExistsResponse();
boolean exists =
metadataManager.databaseExists(request.getDatabaseName());
response.setExists(exists);
@@ -258,12 +269,7 @@ public abstract class RpcServiceBase extends
RpcGatewayService implements AdminR
@Override
public CompletableFuture<GetTableInfoResponse>
getTableInfo(GetTableInfoRequest request) {
TablePath tablePath = toTablePath(request.getTablePath());
- if (authorizer != null) {
- authorizer.authorize(
- currentSession(),
- OperationType.DESCRIBE,
- Resource.table(tablePath.getDatabaseName(),
tablePath.getTableName()));
- }
+ authorizeTable(OperationType.DESCRIBE, tablePath);
GetTableInfoResponse response = new GetTableInfoResponse();
TableInfo tableInfo = metadataManager.getTable(tablePath);
@@ -278,6 +284,8 @@ public abstract class RpcServiceBase extends
RpcGatewayService implements AdminR
@Override
public CompletableFuture<GetTableSchemaResponse>
getTableSchema(GetTableSchemaRequest request) {
TablePath tablePath = toTablePath(request.getTablePath());
+ authorizeTable(OperationType.DESCRIBE, tablePath);
+
final SchemaInfo schemaInfo;
if (request.hasSchemaId()) {
schemaInfo = metadataManager.getSchemaById(tablePath,
request.getSchemaId());
@@ -292,6 +300,7 @@ public abstract class RpcServiceBase extends
RpcGatewayService implements AdminR
@Override
public CompletableFuture<TableExistsResponse>
tableExists(TableExistsRequest request) {
+ // By design: table exists not need to check table authorization.
TableExistsResponse response = new TableExistsResponse();
boolean exists =
metadataManager.tableExists(toTablePath(request.getTablePath()));
response.setExists(exists);
@@ -302,6 +311,8 @@ public abstract class RpcServiceBase extends
RpcGatewayService implements AdminR
public CompletableFuture<GetLatestKvSnapshotsResponse>
getLatestKvSnapshots(
GetLatestKvSnapshotsRequest request) {
TablePath tablePath = toTablePath(request.getTablePath());
+ authorizeTable(OperationType.DESCRIBE, tablePath);
+
// get table info
TableInfo tableInfo = metadataManager.getTable(tablePath);
@@ -363,9 +374,12 @@ public abstract class RpcServiceBase extends
RpcGatewayService implements AdminR
@Override
public CompletableFuture<GetKvSnapshotMetadataResponse>
getKvSnapshotMetadata(
GetKvSnapshotMetadataRequest request) {
+ long tableId = request.getTableId();
+ authorizeTable(OperationType.DESCRIBE, tableId);
+
TableBucket tableBucket =
new TableBucket(
- request.getTableId(),
+ tableId,
request.hasPartitionId() ? request.getPartitionId() :
null,
request.getBucketId());
long snapshotId = request.getSnapshotId();
@@ -412,6 +426,8 @@ public abstract class RpcServiceBase extends
RpcGatewayService implements AdminR
public CompletableFuture<ListPartitionInfosResponse> listPartitionInfos(
ListPartitionInfosRequest request) {
TablePath tablePath = toTablePath(request.getTablePath());
+ authorizeTable(OperationType.DESCRIBE, tablePath);
+
Map<String, Long> partitionNameAndIds;
if (request.hasPartialPartitionSpec()) {
ResolvedPartitionSpec partitionSpecFromRequest =
@@ -430,8 +446,10 @@ public abstract class RpcServiceBase extends
RpcGatewayService implements AdminR
@Override
public CompletableFuture<GetLatestLakeSnapshotResponse>
getLatestLakeSnapshot(
GetLatestLakeSnapshotRequest request) {
- // get table info
TablePath tablePath = toTablePath(request.getTablePath());
+ authorizeTable(OperationType.DESCRIBE, tablePath);
+
+ // get table info
TableInfo tableInfo = metadataManager.getTable(tablePath);
// get table id
long tableId = tableInfo.getTableId();
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 6f60c0c9c..70b359e4d 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -34,6 +34,8 @@ import
org.apache.fluss.exception.LakeTableAlreadyExistException;
import org.apache.fluss.exception.SecurityDisabledException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
+import org.apache.fluss.exception.UnknownServerException;
+import org.apache.fluss.exception.UnknownTableOrBucketException;
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
@@ -236,6 +238,31 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
IOUtils.closeQuietly(lakeCatalogDynamicLoader, "lake catalog");
}
+ @Override
+ public void authorizeTable(OperationType operationType, long tableId) {
+ if (authorizer != null) {
+ TablePath tablePath;
+ try {
+ AccessContextEvent<TablePath> getTablePathEvent =
+ new AccessContextEvent<>(ctx ->
ctx.getTablePathById(tableId));
+ eventManagerSupplier.get().put(getTablePathEvent);
+ tablePath = getTablePathEvent.getResultFuture().get();
+ } catch (Exception e) {
+ throw new UnknownServerException("Failed to get table path by
ID " + tableId, e);
+ }
+
+ if (tablePath == null) {
+ throw new UnknownTableOrBucketException(
+ String.format(
+ "This server %s does not know this table ID
%s. This may happen when the table "
+ + "metadata cache in the server is not
updated yet.",
+ name(), tableId));
+ }
+
+ authorizeTable(operationType, tablePath);
+ }
+ }
+
@Override
public CompletableFuture<CreateDatabaseResponse>
createDatabase(CreateDatabaseRequest request) {
if (authorizer != null) {
@@ -262,13 +289,7 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
@Override
public CompletableFuture<DropDatabaseResponse>
dropDatabase(DropDatabaseRequest request) {
- if (authorizer != null) {
- authorizer.authorize(
- currentSession(),
- OperationType.DROP,
- Resource.database(request.getDatabaseName()));
- }
-
+ authorizeDatabase(OperationType.DROP, request.getDatabaseName());
DropDatabaseResponse response = new DropDatabaseResponse();
metadataManager.dropDatabase(
request.getDatabaseName(), request.isIgnoreIfNotExists(),
request.isCascade());
@@ -279,12 +300,7 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
public CompletableFuture<CreateTableResponse>
createTable(CreateTableRequest request) {
TablePath tablePath = toTablePath(request.getTablePath());
tablePath.validate();
- if (authorizer != null) {
- authorizer.authorize(
- currentSession(),
- OperationType.CREATE,
- Resource.database(tablePath.getDatabaseName()));
- }
+ authorizeDatabase(OperationType.CREATE, tablePath.getDatabaseName());
TableDescriptor tableDescriptor;
try {
@@ -348,9 +364,7 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest
request) {
TablePath tablePath = toTablePath(request.getTablePath());
tablePath.validate();
- if (authorizer != null) {
- authorizer.authorize(currentSession(), OperationType.ALTER,
Resource.table(tablePath));
- }
+ authorizeTable(OperationType.ALTER, tablePath);
List<TableChange> alterTableConfigChanges =
toAlterTableConfigChanges(request.getConfigChangesList());
@@ -495,12 +509,7 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
@Override
public CompletableFuture<DropTableResponse> dropTable(DropTableRequest
request) {
TablePath tablePath = toTablePath(request.getTablePath());
- if (authorizer != null) {
- authorizer.authorize(
- currentSession(),
- OperationType.DROP,
- Resource.table(tablePath.getDatabaseName(),
tablePath.getTableName()));
- }
+ authorizeTable(OperationType.DROP, tablePath);
DropTableResponse response = new DropTableResponse();
metadataManager.dropTable(tablePath, request.isIgnoreIfNotExists());
@@ -511,12 +520,7 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
public CompletableFuture<CreatePartitionResponse> createPartition(
CreatePartitionRequest request) {
TablePath tablePath = toTablePath(request.getTablePath());
- if (authorizer != null) {
- authorizer.authorize(
- currentSession(),
- OperationType.WRITE,
- Resource.table(tablePath.getDatabaseName(),
tablePath.getTableName()));
- }
+ authorizeTable(OperationType.WRITE, tablePath);
CreatePartitionResponse response = new CreatePartitionResponse();
TableRegistration table =
metadataManager.getTableRegistration(tablePath);
@@ -552,12 +556,7 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
@Override
public CompletableFuture<DropPartitionResponse>
dropPartition(DropPartitionRequest request) {
TablePath tablePath = toTablePath(request.getTablePath());
- if (authorizer != null) {
- authorizer.authorize(
- currentSession(),
- OperationType.WRITE,
- Resource.table(tablePath.getDatabaseName(),
tablePath.getTableName()));
- }
+ authorizeTable(OperationType.WRITE, tablePath);
DropPartitionResponse response = new DropPartitionResponse();
TableRegistration table =
metadataManager.getTableRegistration(tablePath);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
index 9a47b6383..992b96333 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
@@ -386,7 +386,8 @@ public final class TabletService extends RpcServiceBase
implements TabletServerG
return response;
}
- private void authorizeTable(OperationType operationType, long tableId) {
+ @Override
+ public void authorizeTable(OperationType operationType, long tableId) {
if (authorizer != null) {
TablePath tablePath =
metadataCache.getTablePath(tableId).orElse(null);
if (tablePath == null) {
@@ -396,15 +397,7 @@ public final class TabletService extends RpcServiceBase
implements TabletServerG
+ "metadata cache in the server is not
updated yet.",
serviceName, tableId));
}
- if (!authorizer.isAuthorized(
- currentSession(), operationType,
Resource.table(tablePath))) {
- throw new AuthorizationException(
- String.format(
- "No permission to %s table %s in database %s",
- operationType,
- tablePath.getTableName(),
- tablePath.getDatabaseName()));
- }
+ authorizeTable(operationType, tablePath);
}
}