This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 75ea74907 [kv] Validate non-primary-key table for KvSnapshotLease
acquire and release APIs (#2723)
75ea74907 is described below
commit 75ea749072bf9a5b0d44e65f1faf41d26e2e74cc
Author: yunhong <[email protected]>
AuthorDate: Thu Feb 26 16:10:22 2026 +0800
[kv] Validate non-primary-key table for KvSnapshotLease acquire and release
APIs (#2723)
---
.../fluss/client/admin/FlussAdminITCase.java | 37 ++++++++++++++++++++
.../server/coordinator/CoordinatorService.java | 39 ++++++++++++++++------
2 files changed, 66 insertions(+), 10 deletions(-)
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index d2522d1eb..2ae715fe0 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -41,6 +41,7 @@ import org.apache.fluss.exception.InvalidDatabaseException;
import org.apache.fluss.exception.InvalidPartitionException;
import org.apache.fluss.exception.InvalidReplicationFactorException;
import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.exception.NonPrimaryKeyTableException;
import org.apache.fluss.exception.PartitionAlreadyExistsException;
import org.apache.fluss.exception.PartitionNotExistException;
import org.apache.fluss.exception.SchemaNotExistException;
@@ -2016,4 +2017,40 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
.isInstanceOf(InvalidTableException.class)
.hasMessageContaining("Row count is disabled for this table");
}
+
+ @Test
+ void testKvSnapshotLeaseForLogTable() throws Exception {
+ // Create a log table (without primary key)
+ TablePath logTablePath = TablePath.of("test_db",
"test_log_table_kv_lease");
+ Schema logSchema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("age", DataTypes.INT())
+ .build();
+ TableDescriptor logTableDescriptor =
+
TableDescriptor.builder().schema(logSchema).distributedBy(3).build();
+ long tableId = createTable(logTablePath, logTableDescriptor, true);
+
+ // Create a KvSnapshotLease
+ KvSnapshotLease lease = admin.createKvSnapshotLease("test-lease-log",
60000);
+
+ // Test acquireSnapshots should fail for log table
+ Map<TableBucket, Long> snapshotIds = new HashMap<>();
+ snapshotIds.put(new TableBucket(tableId, 0), 0L);
+ assertThatThrownBy(() -> lease.acquireSnapshots(snapshotIds).get())
+ .cause()
+ .isInstanceOf(NonPrimaryKeyTableException.class)
+ .hasMessageContaining("is not a primary key table");
+
+ // Test releaseSnapshots should fail for log table
+ assertThatThrownBy(
+ () ->
+ lease.releaseSnapshots(
+ Collections.singleton(new
TableBucket(tableId, 0)))
+ .get())
+ .cause()
+ .isInstanceOf(NonPrimaryKeyTableException.class)
+ .hasMessageContaining("is not a primary key table");
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 840c82e98..388f1816f 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -33,6 +33,7 @@ import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.InvalidDatabaseException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.exception.LakeTableAlreadyExistException;
+import org.apache.fluss.exception.NonPrimaryKeyTableException;
import org.apache.fluss.exception.SecurityDisabledException;
import org.apache.fluss.exception.TableAlreadyExistException;
import org.apache.fluss.exception.TableNotPartitionedException;
@@ -50,6 +51,7 @@ import org.apache.fluss.metadata.ResolvedPartitionSpec;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest;
@@ -300,6 +302,11 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
*/
private void authorizeTableWithSession(
Session session, OperationType operationType, long tableId) {
+ TablePath tablePath = getTablePathById(tableId);
+ authorizer.authorize(session, operationType,
Resource.table(tablePath));
+ }
+
+ private TablePath getTablePathById(long tableId) {
TablePath tablePath;
try {
// TODO: this will block on the coordinator event thread, consider
refactor
@@ -320,8 +327,16 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
+ "metadata cache in the server is not
updated yet.",
name(), tableId));
}
+ return tablePath;
+ }
- authorizer.authorize(session, operationType,
Resource.table(tablePath));
+ private void validateKvTable(long tableId) {
+ TablePath tablePath = getTablePathById(tableId);
+ TableInfo tableInfo = metadataManager.getTable(tablePath);
+ if (!tableInfo.hasPrimaryKey()) {
+ throw new NonPrimaryKeyTableException(
+ "Table '" + tablePath + "' is not a primary key table");
+ }
}
@Override
@@ -881,13 +896,15 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
@Override
public CompletableFuture<AcquireKvSnapshotLeaseResponse>
acquireKvSnapshotLease(
AcquireKvSnapshotLeaseRequest request) {
- // Authorization: require WRITE permission on all tables in the request
- if (authorizer != null) {
- for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable :
- request.getSnapshotsToLeasesList()) {
- long tableId = kvSnapshotLeaseForTable.getTableId();
+ for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable :
+ request.getSnapshotsToLeasesList()) {
+ long tableId = kvSnapshotLeaseForTable.getTableId();
+ if (authorizer != null) {
+ // Authorization: require WRITE permission on all tables in
the request
authorizeTable(OperationType.READ, tableId);
}
+
+ validateKvTable(tableId);
}
String leaseId = request.getLeaseId();
@@ -913,12 +930,14 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
@Override
public CompletableFuture<ReleaseKvSnapshotLeaseResponse>
releaseKvSnapshotLease(
ReleaseKvSnapshotLeaseRequest request) {
- // Authorization: require WRITE permission on all tables in the request
- if (authorizer != null) {
- for (PbTableBucket tableBucket :
request.getBucketsToReleasesList()) {
- long tableId = tableBucket.getTableId();
+ for (PbTableBucket tableBucket : request.getBucketsToReleasesList()) {
+ long tableId = tableBucket.getTableId();
+ if (authorizer != null) {
+ // Authorization: require WRITE permission on all tables in
the request.
authorizeTable(OperationType.READ, tableId);
}
+
+ validateKvTable(tableId);
}
String leaseId = request.getLeaseId();