This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 75ea74907 [kv] Validate non-primary-key table for KvSnapshotLease 
acquire and release APIs (#2723)
75ea74907 is described below

commit 75ea749072bf9a5b0d44e65f1faf41d26e2e74cc
Author: yunhong <[email protected]>
AuthorDate: Thu Feb 26 16:10:22 2026 +0800

    [kv] Validate non-primary-key table for KvSnapshotLease acquire and release 
APIs (#2723)
---
 .../fluss/client/admin/FlussAdminITCase.java       | 37 ++++++++++++++++++++
 .../server/coordinator/CoordinatorService.java     | 39 ++++++++++++++++------
 2 files changed, 66 insertions(+), 10 deletions(-)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index d2522d1eb..2ae715fe0 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -41,6 +41,7 @@ import org.apache.fluss.exception.InvalidDatabaseException;
 import org.apache.fluss.exception.InvalidPartitionException;
 import org.apache.fluss.exception.InvalidReplicationFactorException;
 import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.exception.NonPrimaryKeyTableException;
 import org.apache.fluss.exception.PartitionAlreadyExistsException;
 import org.apache.fluss.exception.PartitionNotExistException;
 import org.apache.fluss.exception.SchemaNotExistException;
@@ -2016,4 +2017,40 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                 .isInstanceOf(InvalidTableException.class)
                 .hasMessageContaining("Row count is disabled for this table");
     }
+
+    @Test
+    void testKvSnapshotLeaseForLogTable() throws Exception {
+        // Create a log table (without primary key)
+        TablePath logTablePath = TablePath.of("test_db", 
"test_log_table_kv_lease");
+        Schema logSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column("age", DataTypes.INT())
+                        .build();
+        TableDescriptor logTableDescriptor =
+                
TableDescriptor.builder().schema(logSchema).distributedBy(3).build();
+        long tableId = createTable(logTablePath, logTableDescriptor, true);
+
+        // Create a KvSnapshotLease
+        KvSnapshotLease lease = admin.createKvSnapshotLease("test-lease-log", 
60000);
+
+        // Test acquireSnapshots should fail for log table
+        Map<TableBucket, Long> snapshotIds = new HashMap<>();
+        snapshotIds.put(new TableBucket(tableId, 0), 0L);
+        assertThatThrownBy(() -> lease.acquireSnapshots(snapshotIds).get())
+                .cause()
+                .isInstanceOf(NonPrimaryKeyTableException.class)
+                .hasMessageContaining("is not a primary key table");
+
+        // Test releaseSnapshots should fail for log table
+        assertThatThrownBy(
+                        () ->
+                                lease.releaseSnapshots(
+                                                Collections.singleton(new 
TableBucket(tableId, 0)))
+                                        .get())
+                .cause()
+                .isInstanceOf(NonPrimaryKeyTableException.class)
+                .hasMessageContaining("is not a primary key table");
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 840c82e98..388f1816f 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -33,6 +33,7 @@ import org.apache.fluss.exception.InvalidCoordinatorException;
 import org.apache.fluss.exception.InvalidDatabaseException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.LakeTableAlreadyExistException;
+import org.apache.fluss.exception.NonPrimaryKeyTableException;
 import org.apache.fluss.exception.SecurityDisabledException;
 import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.exception.TableNotPartitionedException;
@@ -50,6 +51,7 @@ import org.apache.fluss.metadata.ResolvedPartitionSpec;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
 import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest;
@@ -300,6 +302,11 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
      */
     private void authorizeTableWithSession(
             Session session, OperationType operationType, long tableId) {
+        TablePath tablePath = getTablePathById(tableId);
+        authorizer.authorize(session, operationType, 
Resource.table(tablePath));
+    }
+
+    private TablePath getTablePathById(long tableId) {
         TablePath tablePath;
         try {
             // TODO: this will block on the coordinator event thread, consider 
refactor
@@ -320,8 +327,16 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                                     + "metadata cache in the server is not 
updated yet.",
                             name(), tableId));
         }
+        return tablePath;
+    }
 
-        authorizer.authorize(session, operationType, 
Resource.table(tablePath));
+    private void validateKvTable(long tableId) {
+        TablePath tablePath = getTablePathById(tableId);
+        TableInfo tableInfo = metadataManager.getTable(tablePath);
+        if (!tableInfo.hasPrimaryKey()) {
+            throw new NonPrimaryKeyTableException(
+                    "Table '" + tablePath + "' is not a primary key table");
+        }
     }
 
     @Override
@@ -881,13 +896,15 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
     @Override
     public CompletableFuture<AcquireKvSnapshotLeaseResponse> 
acquireKvSnapshotLease(
             AcquireKvSnapshotLeaseRequest request) {
-        // Authorization: require WRITE permission on all tables in the request
-        if (authorizer != null) {
-            for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable :
-                    request.getSnapshotsToLeasesList()) {
-                long tableId = kvSnapshotLeaseForTable.getTableId();
+        for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable :
+                request.getSnapshotsToLeasesList()) {
+            long tableId = kvSnapshotLeaseForTable.getTableId();
+            if (authorizer != null) {
+                // Authorization: require WRITE permission on all tables in 
the request
                 authorizeTable(OperationType.READ, tableId);
             }
+
+            validateKvTable(tableId);
         }
 
         String leaseId = request.getLeaseId();
@@ -913,12 +930,14 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
     @Override
     public CompletableFuture<ReleaseKvSnapshotLeaseResponse> 
releaseKvSnapshotLease(
             ReleaseKvSnapshotLeaseRequest request) {
-        // Authorization: require WRITE permission on all tables in the request
-        if (authorizer != null) {
-            for (PbTableBucket tableBucket : 
request.getBucketsToReleasesList()) {
-                long tableId = tableBucket.getTableId();
+        for (PbTableBucket tableBucket : request.getBucketsToReleasesList()) {
+            long tableId = tableBucket.getTableId();
+            if (authorizer != null) {
+                // Authorization: require WRITE permission on all tables in 
the request.
                 authorizeTable(OperationType.READ, tableId);
             }
+
+            validateKvTable(tableId);
         }
 
         String leaseId = request.getLeaseId();

Reply via email to