This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit 422d422b93c655c58f84a83f719f7bbfb94bfb83
Author: Jark Wu <[email protected]>
AuthorDate: Thu Feb 12 00:15:55 2026 +0800

    [kv] Support COUNT(*) pushdown for primary key tables and introduce 
GetTableStats RPC (#2651)
---
 .../java/org/apache/fluss/client/admin/Admin.java  |   8 +
 .../org/apache/fluss/client/admin/FlussAdmin.java  | 110 +++++++++++++
 .../fluss/client/utils/ClientRpcMessageUtils.java  |  27 ++++
 .../fluss/client/admin/FlussAdminITCase.java       | 173 +++++++++++++++++++++
 .../java/org/apache/fluss/metadata/TableStats.java |  63 ++++++++
 .../fluss/flink/source/FlinkTableSource.java       |   3 +-
 .../apache/fluss/flink/utils/PushdownUtils.java    |  78 +++++++---
 .../flink/sink/testutils/TestAdminAdapter.java     |   6 +
 .../flink/source/FlinkTableSourceBatchITCase.java  |  55 ++++++-
 .../rpc/entity/TableStatsResultForBucket.java      |  44 ++++++
 .../fluss/rpc/gateway/TabletServerGateway.java     |  10 ++
 .../fluss/rpc/netty/client/ServerConnection.java   |   1 +
 .../org/apache/fluss/rpc/protocol/ApiKeys.java     |   3 +-
 fluss-rpc/src/main/proto/FlussApi.proto            |  46 ++++++
 .../fluss/rpc/TestingTabletGatewayService.java     |   7 +
 .../apache/fluss/server/kv/KvRecoverHelper.java    |  20 ++-
 .../java/org/apache/fluss/server/kv/KvTablet.java  |  33 +++-
 .../fluss/server/kv/prewrite/KvPreWriteBuffer.java |  65 ++++++--
 .../org/apache/fluss/server/log/LogTablet.java     |   4 +
 .../org/apache/fluss/server/replica/Replica.java   |  15 ++
 .../fluss/server/replica/ReplicaManager.java       |  18 +++
 .../apache/fluss/server/tablet/TabletService.java  |  15 ++
 .../fluss/server/utils/ServerRpcMessageUtils.java  |  44 ++++++
 .../org/apache/fluss/server/kv/KvTabletTest.java   | 143 +++++++++++++++++
 .../server/kv/prewrite/KvPreWriteBufferTest.java   |  77 +++++++--
 .../server/replica/KvReplicaRestoreITCase.java     |  64 +++++++-
 .../server/tablet/TestTabletServerGateway.java     |   7 +
 27 files changed, 1063 insertions(+), 76 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java 
b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
index 04e7cb214..d8fe57660 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java
@@ -65,6 +65,7 @@ import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metadata.TableStats;
 import org.apache.fluss.security.acl.AclBinding;
 import org.apache.fluss.security.acl.AclBindingFilter;
 
@@ -533,6 +534,13 @@ public interface Admin extends AutoCloseable {
             Collection<Integer> buckets,
             OffsetSpec offsetSpec);
 
+    /**
+     * Asynchronously gets the statistics of this table.
+     *
+     * @return A future TableStats
+     */
+    CompletableFuture<TableStats> getTableStats(TablePath tablePath);
+
     /**
      * Retrieves ACL entries filtered by principal for the specified resource.
      *
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java 
b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
index 76662dc22..8f39a6e42 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java
@@ -30,6 +30,7 @@ import org.apache.fluss.cluster.rebalance.RebalanceProgress;
 import org.apache.fluss.cluster.rebalance.ServerTag;
 import org.apache.fluss.config.cluster.AlterConfig;
 import org.apache.fluss.config.cluster.ConfigEntry;
+import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.LeaderNotAvailableException;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
@@ -44,6 +45,7 @@ import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metadata.TableStats;
 import org.apache.fluss.rpc.GatewayClientProxy;
 import org.apache.fluss.rpc.RpcClient;
 import org.apache.fluss.rpc.gateway.AdminGateway;
@@ -70,6 +72,8 @@ import 
org.apache.fluss.rpc.messages.GetLatestKvSnapshotsRequest;
 import org.apache.fluss.rpc.messages.GetProducerOffsetsRequest;
 import org.apache.fluss.rpc.messages.GetTableInfoRequest;
 import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
+import org.apache.fluss.rpc.messages.GetTableStatsRequest;
+import org.apache.fluss.rpc.messages.GetTableStatsResponse;
 import org.apache.fluss.rpc.messages.ListAclsRequest;
 import org.apache.fluss.rpc.messages.ListDatabasesRequest;
 import org.apache.fluss.rpc.messages.ListDatabasesResponse;
@@ -82,6 +86,7 @@ import org.apache.fluss.rpc.messages.PbAlterConfig;
 import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket;
 import org.apache.fluss.rpc.messages.PbPartitionSpec;
 import org.apache.fluss.rpc.messages.PbTablePath;
+import org.apache.fluss.rpc.messages.PbTableStatsRespForBucket;
 import org.apache.fluss.rpc.messages.RebalanceRequest;
 import org.apache.fluss.rpc.messages.RebalanceResponse;
 import org.apache.fluss.rpc.messages.RemoveServerTagRequest;
@@ -91,6 +96,7 @@ import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.security.acl.AclBinding;
 import org.apache.fluss.security.acl.AclBindingFilter;
 import org.apache.fluss.utils.MapUtils;
+import org.apache.fluss.utils.concurrent.FutureUtils;
 
 import javax.annotation.Nullable;
 
@@ -106,6 +112,7 @@ import java.util.concurrent.CompletableFuture;
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
+import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeGetTableStatsRequest;
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeListOffsetsRequest;
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makePbPartitionSpec;
 import static 
org.apache.fluss.client.utils.ClientRpcMessageUtils.makeRegisterProducerOffsetsRequest;
@@ -457,6 +464,47 @@ public class FlussAdmin implements Admin {
         return listOffsets(PhysicalTablePath.of(tablePath, partitionName), 
buckets, offsetSpec);
     }
 
+    @Override
+    public CompletableFuture<TableStats> getTableStats(TablePath tablePath) {
+        metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
+        TableInfo tableInfo = getTableInfo(tablePath).join();
+        try {
+            int bucketCount = tableInfo.getNumBuckets();
+            List<PartitionInfo> partitionInfos;
+            if (tableInfo.isPartitioned()) {
+                partitionInfos = listPartitionInfos(tablePath).get();
+            } else {
+                partitionInfos = Collections.singletonList(null);
+            }
+            // create all TableBuckets for each partition and bucket 
combination
+            Map<TableBucket, CompletableFuture<Long>> bucketToRowCountMap = 
new HashMap<>();
+            for (PartitionInfo partitionInfo : partitionInfos) {
+                for (int bucket = 0; bucket < bucketCount; bucket++) {
+                    TableBucket tb =
+                            new TableBucket(
+                                    tableInfo.getTableId(),
+                                    partitionInfo == null ? null : 
partitionInfo.getPartitionId(),
+                                    bucket);
+                    bucketToRowCountMap.put(tb, new CompletableFuture<>());
+                }
+            }
+            Map<Integer, GetTableStatsRequest> requestMap =
+                    prepareTableStatsRequests(
+                            metadataUpdater, bucketToRowCountMap.keySet(), 
tablePath);
+            sendTableStatsRequest(
+                    metadataUpdater, tableInfo.getTableId(), requestMap, 
bucketToRowCountMap);
+            return FutureUtils.combineAll(bucketToRowCountMap.values())
+                    .thenApply(
+                            counts -> {
+                                long totalRowCount = 
counts.stream().reduce(0L, Long::sum);
+                                return new TableStats(totalRowCount);
+                            });
+        } catch (Exception e) {
+            throw new FlussRuntimeException(
+                    String.format("Failed to get row count for the table 
'%s'.", tablePath), e);
+        }
+    }
+
     private ListOffsetsResult listOffsets(
             PhysicalTablePath physicalTablePath,
             Collection<Integer> buckets,
@@ -678,6 +726,68 @@ public class FlussAdmin implements Admin {
         // nothing to do yet
     }
 
+    private static Map<Integer, GetTableStatsRequest> 
prepareTableStatsRequests(
+            MetadataUpdater metadataUpdater, Collection<TableBucket> buckets, 
TablePath tablePath) {
+        Map<Integer, List<TableBucket>> nodeForBucketList = new HashMap<>();
+        for (TableBucket tb : buckets) {
+            int leader = metadataUpdater.leaderFor(tablePath, tb);
+            nodeForBucketList.computeIfAbsent(leader, k -> new 
ArrayList<>()).add(tb);
+        }
+
+        Map<Integer, GetTableStatsRequest> requests = new HashMap<>();
+        nodeForBucketList.forEach(
+                (leader, tbs) -> requests.put(leader, 
makeGetTableStatsRequest(tbs)));
+        return requests;
+    }
+
+    private static void sendTableStatsRequest(
+            MetadataUpdater metadataUpdater,
+            long tableId,
+            Map<Integer, GetTableStatsRequest> leaderToRequestMap,
+            Map<TableBucket, CompletableFuture<Long>> bucketToRowCountMap) {
+        leaderToRequestMap.forEach(
+                (leader, request) -> {
+                    TabletServerGateway gateway =
+                            
metadataUpdater.newTabletServerClientForNode(leader);
+                    if (gateway == null) {
+                        throw new LeaderNotAvailableException(
+                                "Server " + leader + " is not found in 
metadata cache.");
+                    } else {
+                        gateway.getTableStats(request)
+                                .whenComplete(
+                                        (response, t) ->
+                                                handleTableStatsResponse(
+                                                        response, t, tableId, 
bucketToRowCountMap));
+                    }
+                });
+    }
+
+    private static void handleTableStatsResponse(
+            GetTableStatsResponse response,
+            Throwable t,
+            long tableId,
+            Map<TableBucket, CompletableFuture<Long>> bucketToRowCountMap) {
+        if (t != null) {
+            // fail all futures to fail fast
+            bucketToRowCountMap.values().forEach(f -> 
f.completeExceptionally(t));
+            return;
+        }
+        for (PbTableStatsRespForBucket resp : response.getBucketsRespsList()) {
+            TableBucket tb =
+                    new TableBucket(
+                            tableId,
+                            resp.hasPartitionId() ? resp.getPartitionId() : 
null,
+                            resp.getBucketId());
+            if (resp.hasErrorCode()) {
+                bucketToRowCountMap
+                        .get(tb)
+                        
.completeExceptionally(ApiError.fromErrorMessage(resp).exception());
+            } else {
+                bucketToRowCountMap.get(tb).complete(resp.getRowCount());
+            }
+        }
+    }
+
     private static Map<Integer, ListOffsetsRequest> prepareListOffsetsRequests(
             MetadataUpdater metadataUpdater,
             long tableId,
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
index 9da29c0ea..a7dbec47c 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/utils/ClientRpcMessageUtils.java
@@ -54,6 +54,7 @@ import 
org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
 import org.apache.fluss.rpc.messages.GetLakeSnapshotResponse;
 import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
 import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse;
+import org.apache.fluss.rpc.messages.GetTableStatsRequest;
 import org.apache.fluss.rpc.messages.ListDatabasesResponse;
 import org.apache.fluss.rpc.messages.ListOffsetsRequest;
 import org.apache.fluss.rpc.messages.ListPartitionInfosResponse;
@@ -84,6 +85,7 @@ import 
org.apache.fluss.rpc.messages.PbRebalanceProgressForTable;
 import org.apache.fluss.rpc.messages.PbRemotePathAndLocalFile;
 import org.apache.fluss.rpc.messages.PbRenameColumn;
 import org.apache.fluss.rpc.messages.PbTableBucket;
+import org.apache.fluss.rpc.messages.PbTableStatsReqForBucket;
 import org.apache.fluss.rpc.messages.PrefixLookupRequest;
 import org.apache.fluss.rpc.messages.ProduceLogRequest;
 import org.apache.fluss.rpc.messages.PutKvRequest;
@@ -755,4 +757,29 @@ public class ClientRpcMessageUtils {
 
         return databaseSummaries;
     }
+
+    public static GetTableStatsRequest 
makeGetTableStatsRequest(List<TableBucket> buckets) {
+        if (buckets.isEmpty()) {
+            throw new IllegalArgumentException("Buckets list cannot be empty");
+        }
+        long tableId = buckets.get(0).getTableId();
+        List<PbTableStatsReqForBucket> pbBuckets =
+                buckets.stream()
+                        .map(
+                                bucket -> {
+                                    if (bucket.getTableId() != tableId) {
+                                        throw new IllegalArgumentException(
+                                                "All buckets should belong to 
the same table");
+                                    }
+                                    PbTableStatsReqForBucket pbBucket =
+                                            new PbTableStatsReqForBucket()
+                                                    
.setBucketId(bucket.getBucket());
+                                    if (bucket.getPartitionId() != null) {
+                                        
pbBucket.setPartitionId(bucket.getPartitionId());
+                                    }
+                                    return pbBucket;
+                                })
+                        .collect(Collectors.toList());
+        return new 
GetTableStatsRequest().setTableId(tableId).addAllBucketsReqs(pbBuckets);
+    }
 }
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
index db1c0f01c..d2522d1eb 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java
@@ -22,6 +22,7 @@ import org.apache.fluss.client.ConnectionFactory;
 import org.apache.fluss.client.metadata.KvSnapshotMetadata;
 import org.apache.fluss.client.metadata.KvSnapshots;
 import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.AppendWriter;
 import org.apache.fluss.client.table.writer.UpsertWriter;
 import org.apache.fluss.cluster.ServerNode;
 import org.apache.fluss.cluster.rebalance.ServerTag;
@@ -1843,4 +1844,176 @@ class FlussAdminITCase extends ClientToServerITCaseBase 
{
         admin.createTable(tablePath6, tableDescriptor6, false).get();
         assertThat(admin.tableExists(tablePath6).get()).isTrue();
     }
+
+    // ==================== Table Statistics Tests ====================
+
+    @Test
+    void testGetTableStatsForPrimaryKeyTableWithFailover() throws Exception {
+        // Create a primary key table
+        TablePath tablePath = TablePath.of("test_db", "test_pk_table_stats");
+        TableDescriptor tableDescriptor =
+                
TableDescriptor.builder().schema(DEFAULT_SCHEMA).distributedBy(3, "id").build();
+        long tableId = createTable(tablePath, tableDescriptor, true);
+        FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
+
+        // Initially, row count should be 0
+        
assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(0);
+
+        // Insert some data
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter upsertWriter = table.newUpsert().createWriter();
+            for (int i = 0; i < 100; i++) {
+                upsertWriter.upsert(row(i, "name" + i, i % 50));
+            }
+            upsertWriter.flush();
+        }
+        
assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(100);
+
+        // Delete some rows, update some rows, insert some rows
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter upsertWriter = table.newUpsert().createWriter();
+            for (int i = 0; i < 30; i++) {
+                upsertWriter.delete(row(i, null, null));
+            }
+            upsertWriter.flush();
+        }
+        
assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(70);
+
+        // snapshot the row count
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
+        // restart all the servers
+        for (int i = 0; i < 
FLUSS_CLUSTER_EXTENSION.getTabletServerNodes().size(); i++) {
+            FLUSS_CLUSTER_EXTENSION.stopTabletServer(i);
+            FLUSS_CLUSTER_EXTENSION.startTabletServer(i);
+        }
+
+        FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
+
+        // reconnect with the restarted cluster
+        Connection newConn = ConnectionFactory.createConnection(clientConf);
+        try (Table table = newConn.getTable(tablePath)) {
+            UpsertWriter upsertWriter = table.newUpsert().createWriter();
+            // update existed rows shouldn't increase row count
+            for (int i = 50; i < 90; i++) {
+                upsertWriter.upsert(row(i, "updated_name" + i, i % 50));
+            }
+            for (int i = 200; i < 300; i++) {
+                upsertWriter.upsert(row(i, "name" + i, i % 50));
+            }
+            upsertWriter.flush();
+        }
+        // Final row count should be 170
+        
assertThat(newConn.getAdmin().getTableStats(tablePath).get().getRowCount()).isEqualTo(170);
+        newConn.close();
+    }
+
+    @Test
+    void testGetTableStatsForLogTable() throws Exception {
+        // Create a log table (no primary key)
+        Schema logTableSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column("age", DataTypes.INT())
+                        .build();
+        TablePath tablePath = TablePath.of("test_db", "test_log_table_stats");
+        TableDescriptor tableDescriptor =
+                
TableDescriptor.builder().schema(logTableSchema).distributedBy(3).build();
+        long tableId = createTable(tablePath, tableDescriptor, true);
+        FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
+
+        // Initially, row count should be 0
+        
assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(0);
+
+        // Append some data
+        try (Table table = conn.getTable(tablePath)) {
+            AppendWriter appendWriter = table.newAppend().createWriter();
+            for (int i = 0; i < 50; i++) {
+                appendWriter.append(row(i, "name" + i, i % 30));
+            }
+            appendWriter.flush();
+        }
+
+        
assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(50);
+    }
+
+    @Test
+    void testGetTableStatsForPartitionedTable() throws Exception {
+        // Create a partitioned primary key table
+        Schema partitionedSchema =
+                Schema.newBuilder()
+                        .primaryKey("id", "dt")
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .column("dt", DataTypes.STRING())
+                        .build();
+        TablePath tablePath = TablePath.of("test_db", 
"test_partitioned_table_stats");
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(partitionedSchema)
+                        .distributedBy(2, "id")
+                        .partitionedBy("dt")
+                        .build();
+        long tableId = createTable(tablePath, tableDescriptor, true);
+
+        // Create partitions
+        admin.createPartition(tablePath, newPartitionSpec("dt", "2024-01-01"), 
false).get();
+        admin.createPartition(tablePath, newPartitionSpec("dt", "2024-01-02"), 
false).get();
+        admin.listPartitionInfos(tablePath)
+                .join()
+                .forEach(
+                        partition -> {
+                            
FLUSS_CLUSTER_EXTENSION.waitUntilTablePartitionReady(
+                                    tableId, partition.getPartitionId());
+                        });
+
+        // Initially, row count should be 0
+        
assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(0);
+
+        // Insert data into different partitions
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter upsertWriter = table.newUpsert().createWriter();
+            // 30 rows in partition 2024-01-01
+            for (int i = 0; i < 30; i++) {
+                upsertWriter.upsert(row(i, "name" + i, "2024-01-01"));
+            }
+            // 20 rows in partition 2024-01-02
+            for (int i = 0; i < 20; i++) {
+                upsertWriter.upsert(row(i, "name" + i, "2024-01-02"));
+            }
+            upsertWriter.flush();
+        }
+        // Total row count should be 50
+        
assertThat(admin.getTableStats(tablePath).get().getRowCount()).isEqualTo(50);
+    }
+
+    @Test
+    void testGetTableStatsForWalChangelogModeTable() throws Exception {
+        // Create a primary key table with WAL changelog mode (row count 
disabled)
+        TablePath tablePath = TablePath.of("test_db", 
"test_wal_changelog_table_stats");
+        Map<String, String> properties = new HashMap<>();
+        properties.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL");
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(DEFAULT_SCHEMA)
+                        .distributedBy(3, "id")
+                        .properties(properties)
+                        .build();
+        createTable(tablePath, tableDescriptor, true);
+
+        // Insert some data
+        try (Table table = conn.getTable(tablePath)) {
+            UpsertWriter upsertWriter = table.newUpsert().createWriter();
+            for (int i = 0; i < 10; i++) {
+                upsertWriter.upsert(row(i, "name" + i, i % 5));
+            }
+            upsertWriter.flush();
+        }
+
+        // Getting table stats should throw exception for WAL changelog mode 
tables
+        assertThatThrownBy(() -> admin.getTableStats(tablePath).get())
+                .cause()
+                .isInstanceOf(InvalidTableException.class)
+                .hasMessageContaining("Row count is disabled for this table");
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metadata/TableStats.java 
b/fluss-common/src/main/java/org/apache/fluss/metadata/TableStats.java
new file mode 100644
index 000000000..4f4022b55
--- /dev/null
+++ b/fluss-common/src/main/java/org/apache/fluss/metadata/TableStats.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.metadata;
+
+import org.apache.fluss.annotation.PublicEvolving;
+
+import java.util.Objects;
+
+/**
+ * Statistics of a table.
+ *
+ * @since 0.9
+ */
+@PublicEvolving
+public class TableStats {
+
+    private final long rowCount;
+
+    public TableStats(long rowCount) {
+        this.rowCount = rowCount;
+    }
+
+    /** Returns the current total row count of the table. */
+    public long getRowCount() {
+        return rowCount;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        TableStats that = (TableStats) o;
+        return rowCount == that.rowCount;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(rowCount);
+    }
+
+    @Override
+    public String toString() {
+        return "TableStats{" + "rowCount=" + rowCount + '}';
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
index 4af25571f..83b4dd29e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java
@@ -284,7 +284,7 @@ public class FlinkTableSource
                 results =
                         Collections.singleton(
                                 GenericRowData.of(
-                                        PushdownUtils.countLogTable(tablePath, 
flussConfig)));
+                                        PushdownUtils.countTable(tablePath, 
flussConfig)));
             }
 
             TypeInformation<RowData> resultTypeInfo =
@@ -593,7 +593,6 @@ public class FlinkTableSource
         // Only supports 'select count(*)/count(1) from source' for log table 
now.
         if (streaming
                 || aggregateExpressions.size() != 1
-                || hasPrimaryKey()
                 || groupingSets.size() > 1
                 || (groupingSets.size() == 1 && groupingSets.get(0).length > 0)
                 // The count pushdown feature is not supported when the data 
lake is enabled.
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
index 16cf55a41..370782607 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PushdownUtils.java
@@ -22,8 +22,6 @@ import org.apache.fluss.client.ConnectionFactory;
 import org.apache.fluss.client.admin.Admin;
 import org.apache.fluss.client.admin.ListOffsetsResult;
 import org.apache.fluss.client.admin.OffsetSpec;
-import org.apache.fluss.client.admin.OffsetSpec.EarliestSpec;
-import org.apache.fluss.client.admin.OffsetSpec.LatestSpec;
 import org.apache.fluss.client.table.Table;
 import org.apache.fluss.client.table.scanner.Scan;
 import org.apache.fluss.client.table.scanner.batch.BatchScanUtils;
@@ -31,12 +29,14 @@ import 
org.apache.fluss.client.table.scanner.batch.BatchScanner;
 import org.apache.fluss.client.table.writer.UpsertWriter;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.UnsupportedVersionException;
 import org.apache.fluss.flink.source.lookup.FlinkLookupFunction;
 import org.apache.fluss.flink.source.lookup.LookupNormalizer;
 import org.apache.fluss.metadata.PartitionInfo;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metadata.TableStats;
 import org.apache.fluss.row.BinaryString;
 import org.apache.fluss.row.Decimal;
 import org.apache.fluss.row.GenericRow;
@@ -76,6 +76,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static 
org.apache.fluss.flink.source.lookup.LookupNormalizer.createPrimaryKeyLookupNormalizer;
+import static org.apache.fluss.utils.ExceptionUtils.findThrowable;
 
 /** Utilities for pushdown abilities. */
 public class PushdownUtils {
@@ -365,34 +366,53 @@ public class PushdownUtils {
         }
     }
 
-    public static long countLogTable(TablePath tablePath, Configuration 
flussConfig) {
+    public static long countTable(TablePath tablePath, Configuration 
flussConfig) {
         try (Connection connection = 
ConnectionFactory.createConnection(flussConfig);
                 Admin flussAdmin = connection.getAdmin()) {
-            TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get();
-            int bucketCount = tableInfo.getNumBuckets();
-            Collection<Integer> buckets =
-                    IntStream.range(0, 
bucketCount).boxed().collect(Collectors.toList());
-            List<PartitionInfo> partitionInfos;
-            if (tableInfo.isPartitioned()) {
-                partitionInfos = 
flussAdmin.listPartitionInfos(tablePath).get();
-            } else {
-                partitionInfos = Collections.singletonList(null);
-            }
-
-            List<CompletableFuture<Long>> countFutureList =
-                    offsetLengthes(flussAdmin, tablePath, partitionInfos, 
buckets);
-            // wait for all the response
-            CompletableFuture.allOf(countFutureList.toArray(new 
CompletableFuture[0])).join();
-            long count = 0;
-            for (CompletableFuture<Long> countFuture : countFutureList) {
-                count += countFuture.get();
+            try {
+                TableStats tableStats = 
flussAdmin.getTableStats(tablePath).get();
+                return tableStats.getRowCount();
+            } catch (Exception e) {
+                if (findThrowable(e, 
UnsupportedVersionException.class).isPresent()) {
+                    // if the server doesn't support getTableStats, we 
fallback to countLogTable,
+                    // which is less efficient.
+                    return countLogTable(flussAdmin, tablePath);
+                } else {
+                    throw e;
+                }
             }
-            return count;
         } catch (Exception e) {
             throw new FlussRuntimeException(e);
         }
     }
 
+    /**
+     * We keep this method for back compatibility for old clusters (< 0.9), 
but it's not efficient.
+     * We should use countTable instead in the future.
+     */
+    private static long countLogTable(Admin flussAdmin, TablePath tablePath) 
throws Exception {
+        TableInfo tableInfo = flussAdmin.getTableInfo(tablePath).get();
+        int bucketCount = tableInfo.getNumBuckets();
+        Collection<Integer> buckets =
+                IntStream.range(0, 
bucketCount).boxed().collect(Collectors.toList());
+        List<PartitionInfo> partitionInfos;
+        if (tableInfo.isPartitioned()) {
+            partitionInfos = flussAdmin.listPartitionInfos(tablePath).get();
+        } else {
+            partitionInfos = Collections.singletonList(null);
+        }
+
+        List<CompletableFuture<Long>> countFutureList =
+                offsetLengthes(flussAdmin, tablePath, partitionInfos, buckets);
+        // wait for all the response
+        CompletableFuture.allOf(countFutureList.toArray(new 
CompletableFuture[0])).join();
+        long count = 0;
+        for (CompletableFuture<Long> countFuture : countFutureList) {
+            count += countFuture.get();
+        }
+        return count;
+    }
+
     private static List<CompletableFuture<Long>> offsetLengthes(
             Admin flussAdmin,
             TablePath tablePath,
@@ -402,9 +422,19 @@ public class PushdownUtils {
         for (@Nullable PartitionInfo info : partitionInfos) {
             String partitionName = info != null ? info.getPartitionName() : 
null;
             ListOffsetsResult earliestOffsets =
-                    listOffsets(flussAdmin, tablePath, buckets, new 
EarliestSpec(), partitionName);
+                    listOffsets(
+                            flussAdmin,
+                            tablePath,
+                            buckets,
+                            new OffsetSpec.EarliestSpec(),
+                            partitionName);
             ListOffsetsResult latestOffsets =
-                    listOffsets(flussAdmin, tablePath, buckets, new 
LatestSpec(), partitionName);
+                    listOffsets(
+                            flussAdmin,
+                            tablePath,
+                            buckets,
+                            new OffsetSpec.LatestSpec(),
+                            partitionName);
             CompletableFuture<Long> apply =
                     earliestOffsets
                             .all()
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java
index ee404eeb4..c8dbc0dc0 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/testutils/TestAdminAdapter.java
@@ -45,6 +45,7 @@ import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metadata.TableStats;
 import org.apache.fluss.security.acl.AclBinding;
 import org.apache.fluss.security.acl.AclBindingFilter;
 
@@ -222,6 +223,11 @@ public class TestAdminAdapter implements Admin {
         throw new UnsupportedOperationException("Not implemented in 
TestAdminAdapter");
     }
 
+    @Override
+    public CompletableFuture<TableStats> getTableStats(TablePath tablePath) {
+        throw new UnsupportedOperationException("Not implemented in 
TestAdminAdapter");
+    }
+
     @Override
     public CompletableFuture<Collection<AclBinding>> listAcls(AclBindingFilter 
aclBindingFilter) {
         throw new UnsupportedOperationException("Not implemented in 
TestAdminAdapter");
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
index 03f853f61..f449477df 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.source;
 import org.apache.fluss.client.table.Table;
 import org.apache.fluss.client.table.writer.AppendWriter;
 import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.flink.utils.FlinkTestBase;
 import org.apache.fluss.metadata.TablePath;
 
@@ -352,9 +353,11 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
 
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
-    void testCountPushDown(boolean partitionTable) throws Exception {
-        String tableName = partitionTable ? preparePartitionedLogTable() : 
prepareLogTable();
-        int expectedRows = partitionTable ? 10 : 5;
+    void testCountPushDownForPkTable(boolean partitionTable) throws Exception {
+        String tableName =
+                partitionTable
+                        ? prepareSourceTable(new String[] {"id", "dt"}, "dt")
+                        : prepareSourceTable(new String[] {"id"}, null);
         // normal scan
         String query = String.format("SELECT COUNT(*) FROM %s", tableName);
         assertThat(tEnv.explainSql(query))
@@ -362,7 +365,7 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
                         "aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]");
         CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
         List<String> collected = collectRowsWithTimeout(iterRows, 1);
-        List<String> expected = 
Collections.singletonList(String.format("+I[%s]", expectedRows));
+        List<String> expected = Collections.singletonList("+I[5]");
         assertThat(collected).isEqualTo(expected);
 
         // test not push down grouping count.
@@ -375,15 +378,51 @@ abstract class FlinkTableSourceBatchITCase extends 
FlinkTestBase {
                                         .wait())
                 .hasMessageContaining(
                         "Currently, Fluss only support queries on table with 
datalake enabled or point queries on primary key when it's in batch execution 
mode.");
+    }
+
+    @Test
+    void testCountPushDownWithWALMode() throws Exception {
+        String tableName = "test_count_table_with_wal";
+        tEnv.executeSql(
+                        String.format(
+                                "create table %s ("
+                                        + "  id int not null,"
+                                        + "  address varchar,"
+                                        + "  name varchar,"
+                                        + "  primary key (id) NOT ENFORCED)"
+                                        + " with ('bucket.num' = '4', 
'table.changelog.image' = 'wal')",
+                                tableName))
+                .await();
+        // normal scan
+        String query = String.format("SELECT COUNT(*) FROM %s", tableName);
+        assertThatThrownBy(() -> tEnv.executeSql(query))
+                .hasRootCauseInstanceOf(InvalidTableException.class)
+                .hasMessageContaining(
+                        "Row count is disabled for this table 
'defaultdb.test_count_table_with_wal'.");
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testCountPushDownForLogTable(boolean partitionTable) throws Exception 
{
+        String tableName = partitionTable ? preparePartitionedLogTable() : 
prepareLogTable();
+        int expectedRows = partitionTable ? 10 : 5;
+        // normal scan
+        String query = String.format("SELECT COUNT(*) FROM %s", tableName);
+        assertThat(tEnv.explainSql(query))
+                .contains(
+                        "aggregates=[grouping=[], 
aggFunctions=[Count1AggFunction()]]]], fields=[count1$0]");
+        CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect();
+        List<String> collected = collectRowsWithTimeout(iterRows, 1);
+        List<String> expected = 
Collections.singletonList(String.format("+I[%s]", expectedRows));
+        assertThat(collected).isEqualTo(expected);
 
-        // test not support primary key now
-        String primaryTableName = prepareSourceTable(new String[] {"id"}, 
null);
+        // test not push down grouping count.
         assertThatThrownBy(
                         () ->
                                 tEnv.explainSql(
                                                 String.format(
-                                                        "SELECT COUNT(*) FROM 
%s ",
-                                                        primaryTableName))
+                                                        "SELECT COUNT(*) FROM 
%s group by id",
+                                                        tableName))
                                         .wait())
                 .hasMessageContaining(
                         "Currently, Fluss only support queries on table with 
datalake enabled or point queries on primary key when it's in batch execution 
mode.");
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/TableStatsResultForBucket.java
 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/TableStatsResultForBucket.java
new file mode 100644
index 000000000..cdea0109e
--- /dev/null
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/entity/TableStatsResultForBucket.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.fluss.rpc.entity;
+
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.rpc.protocol.ApiError;
+
+/** Result of {@link org.apache.fluss.rpc.messages.GetTableStatsResponse} for 
each table bucket. */
+public class TableStatsResultForBucket extends ResultForBucket {
+
+    private final long rowCount;
+
+    public TableStatsResultForBucket(TableBucket tableBucket, long rowCount) {
+        super(tableBucket);
+        this.rowCount = rowCount;
+    }
+
+    public TableStatsResultForBucket(TableBucket tableBucket, ApiError error) {
+        super(tableBucket, error);
+        this.rowCount = -1;
+    }
+
+    /** Returns the row count of the table bucket. If the request is failed, 
it will return -1. */
+    public long getRowCount() {
+        return rowCount;
+    }
+}
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java
index 578b74e5e..6f20828b5 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/TabletServerGateway.java
@@ -20,6 +20,8 @@ package org.apache.fluss.rpc.gateway;
 import org.apache.fluss.rpc.RpcGateway;
 import org.apache.fluss.rpc.messages.FetchLogRequest;
 import org.apache.fluss.rpc.messages.FetchLogResponse;
+import org.apache.fluss.rpc.messages.GetTableStatsRequest;
+import org.apache.fluss.rpc.messages.GetTableStatsResponse;
 import org.apache.fluss.rpc.messages.InitWriterRequest;
 import org.apache.fluss.rpc.messages.InitWriterResponse;
 import org.apache.fluss.rpc.messages.LimitScanRequest;
@@ -130,6 +132,14 @@ public interface TabletServerGateway extends RpcGateway, 
AdminReadOnlyGateway {
     @RPC(api = ApiKeys.LIMIT_SCAN)
     CompletableFuture<LimitScanResponse> limitScan(LimitScanRequest request);
 
+    /**
+     * Get statistics for the specified table buckets.
+     *
+     * @return the table stats response containing per-bucket statistics.
+     */
+    @RPC(api = ApiKeys.GET_TABLE_STATS)
+    CompletableFuture<GetTableStatsResponse> 
getTableStats(GetTableStatsRequest request);
+
     /**
      * List offsets for the specified table bucket.
      *
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
index f2e62c68a..a3fd7afbd 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/client/ServerConnection.java
@@ -315,6 +315,7 @@ final class ServerConnection {
                     version = 
serverApiVersions.highestAvailableVersion(apiKey);
                 } catch (Exception e) {
                     responseFuture.completeExceptionally(e);
+                    return responseFuture;
                 }
             }
 
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
index f7cc45007..f3fadec64 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
@@ -100,7 +100,8 @@ public enum ApiKeys {
     DELETE_PRODUCER_OFFSETS(1055, 0, 0, PUBLIC),
     ACQUIRE_KV_SNAPSHOT_LEASE(1056, 0, 0, PUBLIC),
     RELEASE_KV_SNAPSHOT_LEASE(1057, 0, 0, PUBLIC),
-    DROP_KV_SNAPSHOT_LEASE(1058, 0, 0, PUBLIC);
+    DROP_KV_SNAPSHOT_LEASE(1058, 0, 0, PUBLIC),
+    GET_TABLE_STATS(1059, 0, 0, PUBLIC);
 
     private static final Map<Integer, ApiKeys> ID_TO_TYPE =
             Arrays.stream(ApiKeys.values())
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index 66f09f0dc..f04b30756 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -288,6 +288,21 @@ message LimitScanResponse{
 }
 
 
+// Get table statistics request and response.
+// Sent to TabletServer to get per-bucket statistics.
+message GetTableStatsRequest {
+  required int64 table_id = 1;
+  repeated PbTableStatsReqForBucket buckets_req = 2;
+  // Column indexes for column-level stats (e.g., ndv, sum).
+  // If empty, column stats are not collected.
+  // Reserved for future use.
+  repeated int32 target_columns = 3 [packed = true];
+}
+
+message GetTableStatsResponse {
+  repeated PbTableStatsRespForBucket buckets_resp = 1;
+}
+
 // notify bucket leader and isr request
 message NotifyLeaderAndIsrRequest {
   required int32 coordinator_epoch = 1;
@@ -1184,4 +1199,35 @@ message PbKvSnapshotLeaseForBucket {
   optional int64 partition_id = 1;
   required int32 bucket_id = 2;
   required int64 snapshot_id = 3;
+}
+
+message PbTableStatsReqForBucket {
+  optional int64 partition_id = 1;
+  required int32 bucket_id = 2;
+}
+
+message PbTableStatsRespForBucket {
+  optional int32 error_code = 1;
+  optional string error_message = 2;
+  optional int64 partition_id = 3;
+  required int32 bucket_id = 4;
+
+  // --- Table-level stats ---
+  // The number of rows in this bucket.
+  // For KV tables: the number of unique keys (live rows).
+  // For Log tables: total number of log records (highWatermark - 
logStartOffset).
+  // Absent if row count is not available (e.g., WAL changelog mode or legacy 
tables).
+  optional int64 row_count = 5;
+
+  // --- data size stats (future) ---
+  // The data size in bytes of this bucket.
+  // For KV tables: the size of the KV store.
+  // For Log tables: the size of the log segments.
+  // Reserved for future use.
+  // optional int64 data_size_bytes = 6;
+
+  // --- Column-level stats (future) ---
+  // Per-column statistics, keyed by column index.
+  // Only populated when target_columns is specified in the request.
+  // repeated PbColumnStats column_stats = 10;
 }
\ No newline at end of file
diff --git 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
index 7773ea5da..8d3a4af30 100644
--- 
a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
+++ 
b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java
@@ -39,6 +39,8 @@ import org.apache.fluss.rpc.messages.GetTableInfoRequest;
 import org.apache.fluss.rpc.messages.GetTableInfoResponse;
 import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
 import org.apache.fluss.rpc.messages.GetTableSchemaResponse;
+import org.apache.fluss.rpc.messages.GetTableStatsRequest;
+import org.apache.fluss.rpc.messages.GetTableStatsResponse;
 import org.apache.fluss.rpc.messages.InitWriterRequest;
 import org.apache.fluss.rpc.messages.InitWriterResponse;
 import org.apache.fluss.rpc.messages.LimitScanRequest;
@@ -136,6 +138,11 @@ public class TestingTabletGatewayService extends 
TestingGatewayService
         return null;
     }
 
+    @Override
+    public CompletableFuture<GetTableStatsResponse> 
getTableStats(GetTableStatsRequest request) {
+        return null;
+    }
+
     @Override
     public CompletableFuture<ListOffsetsResponse> 
listOffsets(ListOffsetsRequest request) {
         return null;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
index 5d32e413e..e200f8d60 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvRecoverHelper.java
@@ -150,9 +150,9 @@ public class KvRecoverHelper {
         if (recoverPointRowCount != null) {
             kvTablet.setRowCount(rowCountUpdater.getRowCount());
             LOG.info(
-                    "Updated row count to {} for table '{}' after recovering 
from log",
+                    "Updated row count to {} for tablet '{}' after recovering 
from log",
                     rowCountUpdater.getRowCount(),
-                    kvTablet.getTablePath());
+                    kvTablet.getTableBucket());
         } else {
             LOG.info(
                     "Skipping row count update after recovering from log, 
because this table '{}' doesn't support row count.",
@@ -163,7 +163,10 @@ public class KvRecoverHelper {
         ThrowingConsumer<KeyValueAndLogOffset, Exception> resumeRecordApplier =
                 (resumeRecord) ->
                         kvTablet.putToPreWriteBuffer(
-                                resumeRecord.key, resumeRecord.value, 
resumeRecord.logOffset);
+                                resumeRecord.changeType,
+                                resumeRecord.key,
+                                resumeRecord.value,
+                                resumeRecord.logOffset);
         readLogRecordsAndApply(
                 nextLogOffset,
                 // records in pre-write-buffer shouldn't affect the row count, 
the high-watermark
@@ -178,10 +181,10 @@ public class KvRecoverHelper {
             AutoIncIDRange newRange = autoIncIdRangeUpdater.getNewRange();
             kvTablet.updateAutoIncrementIDRange(newRange);
             LOG.info(
-                    "Updated auto inc id range to [{}, {}] for table '{}' 
after recovering from log",
+                    "Updated auto inc id range to [{}, {}] for tablet '{}' 
after recovering from log",
                     newRange.getStart(),
                     newRange.getEnd(),
-                    kvTablet.getTablePath());
+                    kvTablet.getTableBucket());
         }
     }
 
@@ -230,7 +233,7 @@ public class KvRecoverHelper {
                                 }
                                 resumeRecordConsumer.accept(
                                         new KeyValueAndLogOffset(
-                                                key, value, 
logRecord.logOffset()));
+                                                changeType, key, value, 
logRecord.logOffset()));
 
                                 // reuse the logRow instance which is usually 
a CompactedRow which
                                 // has been deserialized during toKvRow(..)
@@ -301,11 +304,14 @@ public class KvRecoverHelper {
     }
 
     private static final class KeyValueAndLogOffset {
+        private final ChangeType changeType;
         private final byte[] key;
         private final @Nullable byte[] value;
         private final long logOffset;
 
-        public KeyValueAndLogOffset(byte[] key, byte[] value, long logOffset) {
+        public KeyValueAndLogOffset(
+                ChangeType changeType, byte[] key, byte[] value, long 
logOffset) {
+            this.changeType = changeType;
             this.key = key;
             this.value = value;
             this.logOffset = logOffset;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index ca89fe829..e72428a02 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -22,6 +22,7 @@ import org.apache.fluss.compression.ArrowCompressionInfo;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.DeletionDisabledException;
+import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.KvStorageException;
 import org.apache.fluss.exception.SchemaNotExistException;
 import org.apache.fluss.memory.MemorySegmentPool;
@@ -295,6 +296,20 @@ public final class KvTablet {
         this.rowCount = rowCount;
     }
 
+    // row_count is volatile, so it's safe to read without lock
+    public long getRowCount() {
+        if (rowCount == ROW_COUNT_DISABLED) {
+            throw new InvalidTableException(
+                    String.format(
+                            "Row count is disabled for this table '%s'. This 
usually happens when the table is"
+                                    + "created before v0.9 or the changelog 
image is set to WAL, "
+                                    + "as maintaining row count in WAL mode is 
costly and not necessary for most use cases. "
+                                    + "If you want to enable row count, please 
set changelog image to FULL.",
+                            getTablePath()));
+        }
+        return rowCount;
+    }
+
     /**
      * Get the current state of the tablet, including the log offset, row 
count and auto-increment
      * ID range. This is used for snapshot and recovery to capture the state 
of the tablet at a
@@ -589,7 +604,7 @@ public final class KvTablet {
             throws Exception {
         BinaryValue newValue = 
autoIncrementUpdater.updateAutoIncrementColumns(currentValue);
         walBuilder.append(ChangeType.INSERT, 
latestSchemaRow.replaceRow(newValue.row));
-        kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset);
+        kvPreWriteBuffer.insert(key, newValue.encodeValue(), logOffset);
         return logOffset + 1;
     }
 
@@ -603,12 +618,12 @@ public final class KvTablet {
             throws Exception {
         if (changelogImage == ChangelogImage.WAL) {
             walBuilder.append(ChangeType.UPDATE_AFTER, 
latestSchemaRow.replaceRow(newValue.row));
-            kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset);
+            kvPreWriteBuffer.update(key, newValue.encodeValue(), logOffset);
             return logOffset + 1;
         } else {
             walBuilder.append(ChangeType.UPDATE_BEFORE, 
latestSchemaRow.replaceRow(oldValue.row));
             walBuilder.append(ChangeType.UPDATE_AFTER, 
latestSchemaRow.replaceRow(newValue.row));
-            kvPreWriteBuffer.put(key, newValue.encodeValue(), logOffset + 1);
+            kvPreWriteBuffer.update(key, newValue.encodeValue(), logOffset + 
1);
             return logOffset + 2;
         }
     }
@@ -679,12 +694,18 @@ public final class KvTablet {
     }
 
     /** put key,value,logOffset into pre-write buffer directly. */
-    void putToPreWriteBuffer(byte[] key, @Nullable byte[] value, long 
logOffset) {
+    void putToPreWriteBuffer(
+            ChangeType changeType, byte[] key, @Nullable byte[] value, long 
logOffset) {
         KvPreWriteBuffer.Key wrapKey = KvPreWriteBuffer.Key.of(key);
-        if (value == null) {
+        if (changeType == ChangeType.DELETE && value == null) {
             kvPreWriteBuffer.delete(wrapKey, logOffset);
+        } else if (changeType == ChangeType.INSERT) {
+            kvPreWriteBuffer.insert(wrapKey, value, logOffset);
+        } else if (changeType == ChangeType.UPDATE_AFTER) {
+            kvPreWriteBuffer.update(wrapKey, value, logOffset);
         } else {
-            kvPreWriteBuffer.put(wrapKey, value, logOffset);
+            throw new IllegalArgumentException(
+                    "Unsupported change type for putToPreWriteBuffer: " + 
changeType);
         }
     }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
index 5908ba4b1..e14d09eac 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server.kv.prewrite;
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.metrics.Counter;
+import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.server.kv.KvBatchWriter;
 import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
 import org.apache.fluss.utils.MurmurHashUtils;
@@ -114,7 +115,7 @@ public class KvPreWriteBuffer implements AutoCloseable {
      * @param logSequenceNumber the log sequence number for the delete 
operation
      */
     public void delete(Key key, long logSequenceNumber) {
-        update(key, Value.of(null), logSequenceNumber);
+        doPut(ChangeType.DELETE, key, Value.of(null), logSequenceNumber);
     }
 
     /**
@@ -122,11 +123,20 @@ public class KvPreWriteBuffer implements AutoCloseable {
      *
      * @param logSequenceNumber the log sequence number for the put operation
      */
-    public void put(Key key, @Nullable byte[] value, long logSequenceNumber) {
-        update(key, Value.of(value), logSequenceNumber);
+    public void insert(Key key, byte[] value, long logSequenceNumber) {
+        doPut(ChangeType.INSERT, key, Value.of(value), logSequenceNumber);
     }
 
-    private void update(Key key, Value value, long lsn) {
+    /**
+     * Put a key-value pair.
+     *
+     * @param logSequenceNumber the log sequence number for the put operation
+     */
+    public void update(Key key, @Nullable byte[] value, long 
logSequenceNumber) {
+        doPut(ChangeType.UPDATE_AFTER, key, Value.of(value), 
logSequenceNumber);
+    }
+
+    private void doPut(ChangeType changeType, Key key, Value value, long lsn) {
         if (maxLogSequenceNumber >= lsn) {
             throw new IllegalArgumentException(
                     "The log sequence number must be non-decreasing. "
@@ -142,8 +152,8 @@ public class KvPreWriteBuffer implements AutoCloseable {
                         key,
                         (k, v) ->
                                 v == null
-                                        ? KvEntry.of(key, value, lsn)
-                                        : KvEntry.of(key, value, lsn, v));
+                                        ? KvEntry.of(changeType, key, value, 
lsn)
+                                        : KvEntry.of(changeType, key, value, 
lsn, v));
         // append the entry to the tail of the list for all kv entries
         allKvEntries.addLast(kvEntry);
         // update the max lsn
@@ -220,14 +230,19 @@ public class KvPreWriteBuffer implements AutoCloseable {
             Value value = entry.getValue();
             if (value.value != null) {
                 flushedCount += 1;
-                rowCountDiff += 1;
                 kvBatchWriter.put(entry.getKey().key, value.value);
             } else {
                 flushedCount += 1;
-                rowCountDiff -= 1;
                 kvBatchWriter.delete(entry.getKey().key);
             }
 
+            // for update_after, we don't change the row count
+            if (entry.getChangeType() == ChangeType.INSERT) {
+                rowCountDiff += 1;
+            } else if (entry.getChangeType() == ChangeType.DELETE) {
+                rowCountDiff -= 1;
+            }
+
             // if the kv entry to be flushed is equal to the one in the 
kvEntryMap, we
             // can remove it from the map. Although it's not a must to remove 
from the map,
             // we remove it to reduce the memory usage
@@ -271,6 +286,10 @@ public class KvPreWriteBuffer implements AutoCloseable {
         return truncateAsErrorCount;
     }
 
+    // 
-------------------------------------------------------------------------------------------
+    // Inner classes
+    // 
-------------------------------------------------------------------------------------------
+
     /**
      * A class to wrap a key-value pair and the sequence number for the 
key-value pair. If the byte
      * array in the value is null, it means the key in the entry is marked as 
deleted.
@@ -280,6 +299,7 @@ public class KvPreWriteBuffer implements AutoCloseable {
      */
     public static class KvEntry {
 
+        private final ChangeType changeType;
         private final Key key;
         private final Value value;
         private final long logSequenceNumber;
@@ -287,22 +307,36 @@ public class KvPreWriteBuffer implements AutoCloseable {
         // the previous mapped value in the buffer before this key-value put
         @Nullable private final KvEntry previousEntry;
 
-        public static KvEntry of(Key key, Value value, long sequenceNumber) {
-            return new KvEntry(key, value, sequenceNumber, null);
+        public static KvEntry of(ChangeType changeType, Key key, Value value, 
long sequenceNumber) {
+            return new KvEntry(changeType, key, value, sequenceNumber, null);
         }
 
-        public static KvEntry of(Key key, Value value, long sequenceNumber, 
KvEntry previousEntry) {
-            return new KvEntry(key, value, sequenceNumber, previousEntry);
+        public static KvEntry of(
+                ChangeType changeType,
+                Key key,
+                Value value,
+                long sequenceNumber,
+                KvEntry previousEntry) {
+            return new KvEntry(changeType, key, value, sequenceNumber, 
previousEntry);
         }
 
         private KvEntry(
-                Key key, Value value, long logSequenceNumber, @Nullable 
KvEntry previousEntry) {
+                ChangeType changeType,
+                Key key,
+                Value value,
+                long logSequenceNumber,
+                @Nullable KvEntry previousEntry) {
+            this.changeType = changeType;
             this.key = key;
             this.value = value;
             this.logSequenceNumber = logSequenceNumber;
             this.previousEntry = previousEntry;
         }
 
+        public ChangeType getChangeType() {
+            return changeType;
+        }
+
         public Key getKey() {
             return key;
         }
@@ -325,6 +359,7 @@ public class KvPreWriteBuffer implements AutoCloseable {
             }
             KvEntry kvEntry = (KvEntry) o;
             return logSequenceNumber == kvEntry.logSequenceNumber
+                    && changeType == kvEntry.changeType
                     && Objects.equals(key, kvEntry.key)
                     && Objects.equals(value, kvEntry.value)
                     && Objects.equals(previousEntry, kvEntry.previousEntry);
@@ -338,7 +373,9 @@ public class KvPreWriteBuffer implements AutoCloseable {
         @Override
         public String toString() {
             return "KvEntry{"
-                    + "key="
+                    + "changeType="
+                    + changeType
+                    + ", key="
                     + key
                     + ", value="
                     + value
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 371d3c128..a1fa20d5a 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -233,6 +233,10 @@ public final class LogTablet {
         return localLog.getTableBucket();
     }
 
+    public long getRowCount() {
+        return getHighWatermark() - logStartOffset();
+    }
+
     public long getHighWatermark() {
         return highWatermarkMetadata.getMessageOffset();
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 8a1621a84..10678eb5c 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -1379,6 +1379,21 @@ public final class Replica {
         }
     }
 
+    public long getRowCount() {
+        return inReadLock(
+                leaderIsrUpdateLock,
+                () -> {
+                    KvTablet kv = this.kvTablet;
+                    if (kv != null) {
+                        // return materialized row count for primary key table
+                        return kv.getRowCount();
+                    } else {
+                        // return log row count for non-primary key table
+                        return logTablet.getRowCount();
+                    }
+                });
+    }
+
     public long getOffset(RemoteLogManager remoteLogManager, ListOffsetsParam 
listOffsetsParam)
             throws IOException {
         return inReadLock(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index aeb963c9b..eea783dba 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -54,6 +54,7 @@ import org.apache.fluss.rpc.entity.LookupResultForBucket;
 import org.apache.fluss.rpc.entity.PrefixLookupResultForBucket;
 import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
 import org.apache.fluss.rpc.entity.PutKvResultForBucket;
+import org.apache.fluss.rpc.entity.TableStatsResultForBucket;
 import org.apache.fluss.rpc.entity.WriteResultForBucket;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
 import org.apache.fluss.rpc.messages.NotifyKvSnapshotOffsetResponse;
@@ -1265,6 +1266,23 @@ public class ReplicaManager {
         return putResultForBucketMap;
     }
 
+    public void getTableStats(
+            List<TableBucket> tableBucket,
+            Consumer<List<TableStatsResultForBucket>> responseCallback) {
+        List<TableStatsResultForBucket> results = new ArrayList<>();
+        for (TableBucket tb : tableBucket) {
+            try {
+                Replica replica = getReplicaOrException(tb);
+                long rowCount = replica.getRowCount();
+                results.add(new TableStatsResultForBucket(tb, rowCount));
+            } catch (Exception e) {
+                LOG.error("Error getting table stats on replica {}", 
tableBucket, e);
+                results.add(new TableStatsResultForBucket(tb, 
ApiError.fromThrowable(e)));
+            }
+        }
+        responseCallback.accept(results);
+    }
+
     public void limitScan(
             TableBucket tableBucket,
             int limit,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
index df2ab926f..b511f2178 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java
@@ -32,6 +32,8 @@ import org.apache.fluss.rpc.entity.ResultForBucket;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
 import org.apache.fluss.rpc.messages.FetchLogRequest;
 import org.apache.fluss.rpc.messages.FetchLogResponse;
+import org.apache.fluss.rpc.messages.GetTableStatsRequest;
+import org.apache.fluss.rpc.messages.GetTableStatsResponse;
 import org.apache.fluss.rpc.messages.InitWriterRequest;
 import org.apache.fluss.rpc.messages.InitWriterResponse;
 import org.apache.fluss.rpc.messages.LimitScanRequest;
@@ -106,9 +108,11 @@ import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getNotifySnaps
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getProduceLogData;
 import static org.apache.fluss.server.utils.ServerRpcMessageUtils.getPutKvData;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getStopReplicaData;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getTableStatsRequestData;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getTargetColumns;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.getUpdateMetadataRequestData;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeFetchLogResponse;
+import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeGetTableStatsResponse;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeInitWriterResponse;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeLimitScanResponse;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.makeListOffsetsResponse;
@@ -306,6 +310,17 @@ public final class TabletService extends RpcServiceBase 
implements TabletServerG
         return response;
     }
 
+    @Override
+    public CompletableFuture<GetTableStatsResponse> 
getTableStats(GetTableStatsRequest request) {
+        authorizeTable(READ, request.getTableId());
+
+        CompletableFuture<GetTableStatsResponse> response = new 
CompletableFuture<>();
+        replicaManager.getTableStats(
+                getTableStatsRequestData(request),
+                result -> 
response.complete(makeGetTableStatsResponse(result)));
+        return response;
+    }
+
     @Override
     public CompletableFuture<NotifyLeaderAndIsrResponse> notifyLeaderAndIsr(
             NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index 092894e0a..6f5672ff4 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -57,6 +57,7 @@ import org.apache.fluss.rpc.entity.LookupResultForBucket;
 import org.apache.fluss.rpc.entity.PrefixLookupResultForBucket;
 import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
 import org.apache.fluss.rpc.entity.PutKvResultForBucket;
+import org.apache.fluss.rpc.entity.TableStatsResultForBucket;
 import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseRequest;
 import org.apache.fluss.rpc.messages.AcquireKvSnapshotLeaseResponse;
 import org.apache.fluss.rpc.messages.AdjustIsrRequest;
@@ -74,6 +75,8 @@ import 
org.apache.fluss.rpc.messages.GetKvSnapshotMetadataResponse;
 import org.apache.fluss.rpc.messages.GetLakeSnapshotResponse;
 import org.apache.fluss.rpc.messages.GetLatestKvSnapshotsResponse;
 import org.apache.fluss.rpc.messages.GetProducerOffsetsResponse;
+import org.apache.fluss.rpc.messages.GetTableStatsRequest;
+import org.apache.fluss.rpc.messages.GetTableStatsResponse;
 import org.apache.fluss.rpc.messages.InitWriterResponse;
 import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse;
 import org.apache.fluss.rpc.messages.LimitScanResponse;
@@ -146,6 +149,8 @@ import org.apache.fluss.rpc.messages.PbTableBucket;
 import org.apache.fluss.rpc.messages.PbTableMetadata;
 import org.apache.fluss.rpc.messages.PbTableOffsets;
 import org.apache.fluss.rpc.messages.PbTablePath;
+import org.apache.fluss.rpc.messages.PbTableStatsReqForBucket;
+import org.apache.fluss.rpc.messages.PbTableStatsRespForBucket;
 import org.apache.fluss.rpc.messages.PbValue;
 import org.apache.fluss.rpc.messages.PbValueList;
 import org.apache.fluss.rpc.messages.PrefixLookupRequest;
@@ -2061,6 +2066,45 @@ public class ServerRpcMessageUtils {
                 leaseForBucket.getSnapshotId());
     }
 
+    // 
-----------------------------------------------------------------------------------
+    // Table Stats Request and Response
+    // 
-----------------------------------------------------------------------------------
+
+    public static List<TableBucket> 
getTableStatsRequestData(GetTableStatsRequest request) {
+        List<TableBucket> tableBuckets = new ArrayList<>();
+        long tableId = request.getTableId();
+        for (PbTableStatsReqForBucket reqForBucket : 
request.getBucketsReqsList()) {
+            tableBuckets.add(
+                    new TableBucket(
+                            tableId,
+                            reqForBucket.hasPartitionId() ? 
reqForBucket.getPartitionId() : null,
+                            reqForBucket.getBucketId()));
+        }
+        return tableBuckets;
+    }
+
+    public static GetTableStatsResponse makeGetTableStatsResponse(
+            List<TableStatsResultForBucket> stats) {
+        GetTableStatsResponse response = new GetTableStatsResponse();
+        for (TableStatsResultForBucket statForBucket : stats) {
+            TableBucket tb = statForBucket.getTableBucket();
+            PbTableStatsRespForBucket respForBucket =
+                    response.addBucketsResp().setBucketId(tb.getBucket());
+            if (tb.getPartitionId() != null) {
+                respForBucket.setPartitionId(tb.getPartitionId());
+            }
+            if (statForBucket.failed()) {
+                respForBucket.setError(
+                        statForBucket.getErrorCode(), 
statForBucket.getErrorMessage());
+            } else {
+                respForBucket.setRowCount(statForBucket.getRowCount());
+            }
+        }
+        return response;
+    }
+
+    // 
-----------------------------------------------------------------------------------
+
     private static <T> Map<TableBucket, T> mergeResponse(
             Map<TableBucket, T> response, Map<TableBucket, T> errors) {
         if (errors.isEmpty()) {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index 1d59af396..da918f45e 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -17,8 +17,10 @@
 
 package org.apache.fluss.server.kv;
 
+import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.InvalidTargetColumnException;
 import org.apache.fluss.exception.OutOfOrderSequenceException;
 import org.apache.fluss.memory.TestingMemorySegmentPool;
@@ -715,16 +717,19 @@ class KvTabletTest {
 
         KvEntry kv1 =
                 KvEntry.of(
+                        ChangeType.INSERT,
                         Key.of("k1".getBytes()),
                         valueOf(compactedRow(baseRowType, new Object[] {1, 
"v11"})),
                         0);
         KvEntry kv2 =
                 KvEntry.of(
+                        ChangeType.INSERT,
                         Key.of("k2".getBytes()),
                         valueOf(compactedRow(baseRowType, new Object[] {2, 
"v21"})),
                         1);
         KvEntry kv3 =
                 KvEntry.of(
+                        ChangeType.UPDATE_AFTER,
                         Key.of("k2".getBytes()),
                         valueOf(compactedRow(baseRowType, new Object[] {2, 
"v23"})),
                         3,
@@ -1664,4 +1669,142 @@ class KvTabletTest {
         assertThat(statistics.getBlockCacheMemoryUsage()).isEqualTo(0);
         assertThat(statistics.getBlockCachePinnedUsage()).isEqualTo(0);
     }
+
+    // ==================== Row Count Tests ====================
+
+    @Test
+    void testRowCountBasic() throws Exception {
+        initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());
+
+        // Initially row count should be 0
+        assertThat(kvTablet.getRowCount()).isEqualTo(0);
+
+        // Insert 5 records
+        for (int i = 1; i <= 5; i++) {
+            KvRecordBatch batch =
+                    kvRecordBatchFactory.ofRecords(
+                            kvRecordFactory.ofRecord("key" + i, new Object[] 
{i, "val" + i}));
+            kvTablet.putAsLeader(batch, null);
+        }
+        kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+
+        // Row count should be 5
+        assertThat(kvTablet.getRowCount()).isEqualTo(5);
+
+        // Delete 2 records
+        for (int i = 1; i <= 2; i++) {
+            KvRecordBatch deleteBatch =
+                    
kvRecordBatchFactory.ofRecords(kvRecordFactory.ofRecord("key" + i, null));
+            kvTablet.putAsLeader(deleteBatch, null);
+        }
+        kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+
+        // Row count should be 3
+        assertThat(kvTablet.getRowCount()).isEqualTo(3);
+
+        kvTablet.close();
+    }
+
+    @Test
+    void testRowCountWithUpsert() throws Exception {
+        initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());
+
+        // Initially row count should be 0
+        assertThat(kvTablet.getRowCount()).isEqualTo(0);
+
+        // Insert a record
+        KvRecordBatch batch1 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord("key1", new Object[] {1, 
"val1"}));
+        kvTablet.putAsLeader(batch1, null);
+        kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+        assertThat(kvTablet.getRowCount()).isEqualTo(1);
+
+        // Update the same record (upsert) - row count should not change
+        KvRecordBatch batch2 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord("key1", new Object[] {1, 
"val1_updated"}));
+        kvTablet.putAsLeader(batch2, null);
+        kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+        assertThat(kvTablet.getRowCount()).isEqualTo(1);
+
+        // Insert another record
+        KvRecordBatch batch3 =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord("key2", new Object[] {2, 
"val2"}));
+        kvTablet.putAsLeader(batch3, null);
+        kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+        assertThat(kvTablet.getRowCount()).isEqualTo(2);
+
+        kvTablet.close();
+    }
+
+    @Test
+    void testRowCountDisabledForWalChangelog() throws Exception {
+        // Create KvTablet with WAL changelog mode
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put(ConfigOptions.TABLE_CHANGELOG_IMAGE.key(), "WAL");
+        initLogTabletAndKvTablet(DATA1_SCHEMA_PK, tableConfig);
+
+        // Insert some records
+        KvRecordBatch batch =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord("key1", new Object[] {1, 
"val1"}));
+        kvTablet.putAsLeader(batch, null);
+        kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+
+        // Getting row count should throw exception for WAL changelog mode
+        assertThatThrownBy(() -> kvTablet.getRowCount())
+                .isInstanceOf(InvalidTableException.class)
+                .hasMessageContaining("Row count is disabled for this table");
+
+        kvTablet.close();
+    }
+
+    @Test
+    void testRowCountWithMixedOperations() throws Exception {
+        initLogTabletAndKvTablet(DATA1_SCHEMA_PK, new HashMap<>());
+
+        // Initially row count should be 0
+        assertThat(kvTablet.getRowCount()).isEqualTo(0);
+
+        // Batch of mixed operations: insert 10 records
+        List<KvRecord> records = new ArrayList<>();
+        for (int i = 1; i <= 10; i++) {
+            records.add(kvRecordFactory.ofRecord("key" + i, new Object[] {i, 
"val" + i}));
+        }
+        KvRecordBatch insertBatch = kvRecordBatchFactory.ofRecords(records);
+        kvTablet.putAsLeader(insertBatch, null);
+        kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+        assertThat(kvTablet.getRowCount()).isEqualTo(10);
+
+        // Delete some, insert new ones, update existing ones in sequence
+        // Delete keys 1-3
+        List<KvRecord> deleteRecords = new ArrayList<>();
+        for (int i = 1; i <= 3; i++) {
+            deleteRecords.add(kvRecordFactory.ofRecord("key" + i, null));
+        }
+        kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(deleteRecords), 
null);
+        kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+        assertThat(kvTablet.getRowCount()).isEqualTo(7);
+
+        // Insert new keys 11-15
+        List<KvRecord> newRecords = new ArrayList<>();
+        for (int i = 11; i <= 15; i++) {
+            newRecords.add(kvRecordFactory.ofRecord("key" + i, new Object[] 
{i, "val" + i}));
+        }
+        kvTablet.putAsLeader(kvRecordBatchFactory.ofRecords(newRecords), null);
+        kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+        assertThat(kvTablet.getRowCount()).isEqualTo(12);
+
+        // Update existing key 4 - row count should not change
+        KvRecordBatch updateBatch =
+                kvRecordBatchFactory.ofRecords(
+                        kvRecordFactory.ofRecord("key4", new Object[] {4, 
"val4_updated"}));
+        kvTablet.putAsLeader(updateBatch, null);
+        kvTablet.flush(Long.MAX_VALUE, NOPErrorHandler.INSTANCE);
+        assertThat(kvTablet.getRowCount()).isEqualTo(12);
+
+        kvTablet.close();
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
index b39ec39ac..cb07c2d66 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
@@ -25,6 +25,8 @@ import org.junit.jupiter.api.Test;
 
 import javax.annotation.Nonnull;
 
+import java.io.IOException;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -36,10 +38,10 @@ class KvPreWriteBufferTest {
         KvPreWriteBuffer buffer =
                 new KvPreWriteBuffer(
                         new NopKvBatchWriter(), 
TestingMetricGroups.TABLET_SERVER_METRICS);
-        bufferPut(buffer, "key1", "value1", 1);
+        bufferInsert(buffer, "key1", "value1", 1);
         bufferDelete(buffer, "key1", 3);
 
-        assertThatThrownBy(() -> bufferPut(buffer, "key2", "value2", 2))
+        assertThatThrownBy(() -> bufferInsert(buffer, "key2", "value2", 2))
                 .isInstanceOf(IllegalArgumentException.class)
                 .hasMessage(
                         "The log sequence number must be non-decreasing. The 
current "
@@ -61,7 +63,7 @@ class KvPreWriteBufferTest {
 
         // put a series of kv entries
         for (int i = 0; i < 3; i++) {
-            bufferPut(buffer, "key" + i, "value" + i, elementCount++);
+            bufferInsert(buffer, "key" + i, "value" + i, elementCount++);
         }
         // check the key and value;
         for (int i = 0; i < 3; i++) {
@@ -96,7 +98,7 @@ class KvPreWriteBufferTest {
         assertThat(getValue(buffer, "key2")).isNull();
 
         // put key2 again
-        bufferPut(buffer, "key2", "value21", elementCount++);
+        bufferInsert(buffer, "key2", "value21", elementCount++);
         // we can get key2
         assertThat(getValue(buffer, "key2")).isEqualTo("value21");
 
@@ -113,9 +115,9 @@ class KvPreWriteBufferTest {
         }
 
         // put two key3;
-        bufferPut(buffer, "key3", "value31", elementCount++);
-        bufferPut(buffer, "key3", "value32", elementCount++);
-        bufferPut(buffer, "key2", "value22", elementCount++);
+        bufferInsert(buffer, "key3", "value31", elementCount++);
+        bufferInsert(buffer, "key3", "value32", elementCount++);
+        bufferInsert(buffer, "key2", "value22", elementCount++);
         // check get key3 get the latest value
         assertThat(getValue(buffer, "key3")).isEqualTo("value32");
         // check get key2
@@ -144,7 +146,7 @@ class KvPreWriteBufferTest {
 
         // put a series of kv entries
         for (int i = 0; i < 10; i++) {
-            bufferPut(buffer, "key" + i, "value" + i, elementCount++);
+            bufferInsert(buffer, "key" + i, "value" + i, elementCount++);
         }
         // check the key and value;
         for (int i = 0; i < 10; i++) {
@@ -170,8 +172,8 @@ class KvPreWriteBufferTest {
         assertThat(getValue(buffer, "key3")).isNull();
 
         // add update records
-        bufferPut(buffer, "key2", "value2-1", elementCount++);
-        bufferPut(buffer, "key1", "value1-1", elementCount++);
+        bufferInsert(buffer, "key2", "value2-1", elementCount++);
+        bufferInsert(buffer, "key1", "value1-1", elementCount++);
         assertThat(getValue(buffer, "key1")).isEqualTo("value1-1");
         assertThat(buffer.getMaxLSN()).isEqualTo(elementCount - 1);
         buffer.truncateTo(5, TruncateReason.ERROR);
@@ -190,9 +192,60 @@ class KvPreWriteBufferTest {
         assertThat(buffer.getKvEntryMap().size()).isEqualTo(0);
     }
 
-    private static void bufferPut(
+    @Test
+    void testRowCount() throws IOException {
+        KvPreWriteBuffer buffer =
+                new KvPreWriteBuffer(
+                        new NopKvBatchWriter(), 
TestingMetricGroups.TABLET_SERVER_METRICS);
+        int elementCount = 0;
+
+        // put a series of kv entries
+        for (int i = 0; i < 10; i++) {
+            bufferInsert(buffer, "key" + i, "value" + i, elementCount++);
+        }
+        assertThat(buffer.flush(Long.MAX_VALUE)).isEqualTo(10);
+
+        // delete some keys
+        for (int i = 0; i < 5; i++) {
+            bufferDelete(buffer, "key" + i, elementCount++);
+        }
+        assertThat(buffer.flush(Long.MAX_VALUE)).isEqualTo(-5);
+
+        // put some keys again
+        for (int i = 8; i < 9; i++) {
+            bufferUpdate(buffer, "key" + i, "value" + i, elementCount++);
+        }
+        assertThat(buffer.flush(Long.MAX_VALUE)).isEqualTo(0);
+
+        // put some keys again
+        for (int i = 10; i < 20; i++) {
+            bufferInsert(buffer, "key" + i, "value" + i, elementCount++);
+        }
+        for (int i = 10; i < 13; i++) {
+            bufferDelete(buffer, "key" + i, elementCount++);
+        }
+        // restore to here, so the row count should be 10 - 3 = 7
+        int checkpoint = elementCount;
+        for (int i = 30; i < 35; i++) {
+            bufferInsert(buffer, "key" + i, "value" + i, elementCount++);
+        }
+        for (int i = 30; i < 35; i++) {
+            bufferUpdate(buffer, "key" + i, "value" + i, elementCount++);
+        }
+
+        // truncate to 5
+        buffer.truncateTo(checkpoint, TruncateReason.ERROR);
+        assertThat(buffer.flush(Long.MAX_VALUE)).isEqualTo(7);
+    }
+
+    private static void bufferInsert(
+            KvPreWriteBuffer kvPreWriteBuffer, String key, String value, int 
elementCount) {
+        kvPreWriteBuffer.insert(toKey(key), value.getBytes(), elementCount);
+    }
+
+    private static void bufferUpdate(
             KvPreWriteBuffer kvPreWriteBuffer, String key, String value, int 
elementCount) {
-        kvPreWriteBuffer.put(toKey(key), value.getBytes(), elementCount);
+        kvPreWriteBuffer.update(toKey(key), value.getBytes(), elementCount);
     }
 
     private static void bufferDelete(
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
index 932c17f23..8ca1b8e27 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/KvReplicaRestoreITCase.java
@@ -30,6 +30,7 @@ import org.apache.fluss.record.KvRecordBatch;
 import org.apache.fluss.rpc.gateway.TabletServerGateway;
 import org.apache.fluss.rpc.messages.PbLookupRespForBucket;
 import org.apache.fluss.rpc.messages.PutKvRequest;
+import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
 import 
org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
 import org.apache.fluss.server.testutils.FlussClusterExtension;
 import org.apache.fluss.utils.ExceptionUtils;
@@ -44,6 +45,7 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.fluss.record.TestData.DATA1_SCHEMA_PK;
@@ -56,6 +58,7 @@ import static 
org.apache.fluss.testutils.DataTestUtils.genKvRecords;
 import static org.apache.fluss.testutils.DataTestUtils.getKeyValuePairs;
 import static org.apache.fluss.testutils.DataTestUtils.toKvRecordBatch;
 import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** The IT case for the restoring of kv replica. */
 class KvReplicaRestoreITCase {
@@ -200,6 +203,63 @@ class KvReplicaRestoreITCase {
         }
     }
 
+    @Test
+    void testRowCountRecoveryAfterFailover() throws Exception {
+        // Create a primary key table
+        TableDescriptor tableDescriptor =
+                
TableDescriptor.builder().schema(DATA1_SCHEMA_PK).distributedBy(1, "a").build();
+        TablePath tablePath = TablePath.of("test_db", 
"test_row_count_recovery");
+        long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath, 
tableDescriptor);
+        TableBucket tableBucket = new TableBucket(tableId, 0);
+
+        // Wait for replica to become leader
+        FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+        int leaderServer = 
FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tableBucket);
+
+        // Insert 100 records
+        int recordCount = 100;
+        List<KvRecord> records = new ArrayList<>();
+        for (int i = 0; i < recordCount; i++) {
+            records.addAll(genKvRecords(new Object[] {i, "value" + i}));
+        }
+        putRecordBatch(tableBucket, leaderServer, 
toKvRecordBatch(records)).join();
+
+        // Verify initial row count
+        Replica leaderReplica = 
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+        assertThat(leaderReplica.getRowCount()).isEqualTo(recordCount);
+
+        FLUSS_CLUSTER_EXTENSION.triggerAndWaitSnapshot(tablePath);
+
+        // Verify the snapshot contains row count
+        CompletedSnapshot completedSnapshot =
+                completedSnapshotHandleStore
+                        .getLatestCompletedSnapshotHandle(tableBucket)
+                        .get()
+                        .retrieveCompleteSnapshot();
+        assertThat(completedSnapshot.getRowCount()).isNotNull();
+        assertThat(completedSnapshot.getRowCount()).isEqualTo(recordCount);
+
+        // Trigger another write to generate changelogs not in snapshot
+        List<KvRecord> moreRecords = new ArrayList<>();
+        for (int i = recordCount; i < recordCount + 10; i++) {
+            moreRecords.addAll(genKvRecords(new Object[] {i, "value" + i}));
+        }
+        recordCount = recordCount + 10;
+        putRecordBatch(tableBucket, leaderServer, 
toKvRecordBatch(moreRecords)).join();
+
+        // simulate failure and force failover
+        int currentLeader = 
FLUSS_CLUSTER_EXTENSION.waitAndGetLeader(tableBucket);
+        FLUSS_CLUSTER_EXTENSION.stopTabletServer(currentLeader);
+        FLUSS_CLUSTER_EXTENSION.startTabletServer(currentLeader);
+
+        // Get the new leader replica
+        Replica newLeaderReplica = 
FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+        assertThat(newLeaderReplica.getLeaderId()).isNotEqualTo(currentLeader);
+
+        // Verify the row count is restored correctly after failover and 
applied changelogs
+        assertThat(newLeaderReplica.getRowCount()).isEqualTo(recordCount);
+    }
+
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
         conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
@@ -213,13 +273,13 @@ class KvReplicaRestoreITCase {
         return conf;
     }
 
-    private void putRecordBatch(
+    private CompletableFuture<?> putRecordBatch(
             TableBucket tableBucket, int leaderServer, KvRecordBatch 
kvRecordBatch) {
         PutKvRequest putKvRequest =
                 newPutKvRequest(
                         tableBucket.getTableId(), tableBucket.getBucket(), -1, 
kvRecordBatch);
         TabletServerGateway leaderGateway =
                 
FLUSS_CLUSTER_EXTENSION.newTabletServerClientForNode(leaderServer);
-        leaderGateway.putKv(putKvRequest);
+        return leaderGateway.putKv(putKvRequest);
     }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
index 617026fa2..317dee0a2 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java
@@ -44,6 +44,8 @@ import org.apache.fluss.rpc.messages.GetTableInfoRequest;
 import org.apache.fluss.rpc.messages.GetTableInfoResponse;
 import org.apache.fluss.rpc.messages.GetTableSchemaRequest;
 import org.apache.fluss.rpc.messages.GetTableSchemaResponse;
+import org.apache.fluss.rpc.messages.GetTableStatsRequest;
+import org.apache.fluss.rpc.messages.GetTableStatsResponse;
 import org.apache.fluss.rpc.messages.InitWriterRequest;
 import org.apache.fluss.rpc.messages.InitWriterResponse;
 import org.apache.fluss.rpc.messages.LimitScanRequest;
@@ -204,6 +206,11 @@ public class TestTabletServerGateway implements 
TabletServerGateway {
         return null;
     }
 
+    @Override
+    public CompletableFuture<GetTableStatsResponse> 
getTableStats(GetTableStatsRequest request) {
+        return null;
+    }
+
     @Override
     public CompletableFuture<ListOffsetsResponse> 
listOffsets(ListOffsetsRequest request) {
         return null;


Reply via email to